Skip to content

Commit 1f8fca0

Browse files
mccheahash211
authored andcommitted
Extract SSL configuration handling to a separate class (apache-spark-on-k8s#123)
* Extract SSL configuration to a separate class * KubernetesSsl -> Ssl, container -> local
1 parent 622bfdb commit 1f8fca0

File tree

2 files changed

+232
-181
lines changed

2 files changed

+232
-181
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala

Lines changed: 29 additions & 181 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,20 @@
1616
*/
1717
package org.apache.spark.deploy.kubernetes
1818

19-
import java.io.{File, FileInputStream}
20-
import java.security.{KeyStore, SecureRandom}
19+
import java.io.File
20+
import java.security.SecureRandom
2121
import java.util
2222
import java.util.concurrent.{CountDownLatch, TimeUnit}
23-
import javax.net.ssl.{SSLContext, TrustManagerFactory, X509TrustManager}
2423

25-
import com.google.common.base.Charsets
2624
import com.google.common.io.Files
2725
import com.google.common.util.concurrent.SettableFuture
2826
import io.fabric8.kubernetes.api.model._
2927
import io.fabric8.kubernetes.client.{ConfigBuilder => K8SConfigBuilder, DefaultKubernetesClient, KubernetesClient, KubernetesClientException, Watcher}
3028
import io.fabric8.kubernetes.client.Watcher.Action
3129
import org.apache.commons.codec.binary.Base64
3230
import scala.collection.JavaConverters._
33-
import scala.collection.mutable
3431

35-
import org.apache.spark.{SecurityManager, SparkConf, SparkException, SSLOptions}
32+
import org.apache.spark.{SparkConf, SparkException}
3633
import org.apache.spark.deploy.kubernetes.config._
3734
import org.apache.spark.deploy.kubernetes.constants._
3835
import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, UploadedAppResource}
@@ -56,8 +53,6 @@ private[spark] class Client(
5653
private val kubernetesAppId = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
5754
private val secretName = s"$SUBMISSION_APP_SECRET_PREFIX-$kubernetesAppId"
5855
private val secretDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId"
59-
private val sslSecretsDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId-ssl"
60-
private val sslSecretsName = s"$SUBMISSION_SSL_SECRETS_PREFIX-$kubernetesAppId"
6156
private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE)
6257
private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT)
6358
private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT)
@@ -95,7 +90,6 @@ private[spark] class Client(
9590
throw new SparkException(s"Main app resource file $mainAppResource is not a file or" +
9691
s" is a directory.")
9792
}
98-
val (driverSubmitSslOptions, isKeyStoreLocalFile) = parseDriverSubmitSslOptions()
9993
val parsedCustomLabels = parseCustomLabels(customLabels)
10094
var k8ConfBuilder = new K8SConfigBuilder()
10195
.withApiVersion("v1")
@@ -115,6 +109,8 @@ private[spark] class Client(
115109
Utils.tryWithResource(new DefaultKubernetesClient(k8ClientConfig)) { kubernetesClient =>
116110
ShutdownHookManager.addShutdownHook(() =>
117111
kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient))
112+
val sslConfigurationProvider = new SslConfigurationProvider(
113+
sparkConf, kubernetesAppId, kubernetesClient, kubernetesResourceCleaner)
118114
val submitServerSecret = kubernetesClient.secrets().createNew()
119115
.withNewMetadata()
120116
.withName(secretName)
@@ -124,10 +120,7 @@ private[spark] class Client(
124120
.done()
125121
kubernetesResourceCleaner.registerOrUpdateResource(submitServerSecret)
126122
try {
127-
val (sslEnvs, sslVolumes, sslVolumeMounts, sslSecrets) = configureSsl(
128-
kubernetesClient,
129-
driverSubmitSslOptions,
130-
isKeyStoreLocalFile)
123+
val sslConfiguration = sslConfigurationProvider.getSslConfiguration()
131124
// start outer watch for status logging of driver pod
132125
val driverPodCompletedLatch = new CountDownLatch(1)
133126
// only enable interval logging if in waitForAppCompletion mode
@@ -142,21 +135,16 @@ private[spark] class Client(
142135
kubernetesClient,
143136
parsedCustomLabels,
144137
submitServerSecret,
145-
driverSubmitSslOptions,
146-
sslSecrets,
147-
sslVolumes,
148-
sslVolumeMounts,
149-
sslEnvs,
150-
isKeyStoreLocalFile)
138+
sslConfiguration)
151139
configureOwnerReferences(
152140
kubernetesClient,
153141
submitServerSecret,
154-
sslSecrets,
142+
sslConfiguration.sslSecrets,
155143
driverPod,
156144
driverService)
157145
submitApplicationToDriverServer(
158146
kubernetesClient,
159-
driverSubmitSslOptions,
147+
sslConfiguration,
160148
driverService,
161149
submitterLocalFiles,
162150
submitterLocalJars)
@@ -182,7 +170,7 @@ private[spark] class Client(
182170

183171
private def submitApplicationToDriverServer(
184172
kubernetesClient: KubernetesClient,
185-
driverSubmitSslOptions: SSLOptions,
173+
sslConfiguration: SslConfiguration,
186174
driverService: Service,
187175
submitterLocalFiles: Iterable[String],
188176
submitterLocalJars: Iterable[String]): Unit = {
@@ -198,7 +186,7 @@ private[spark] class Client(
198186
sparkConf.setIfMissing("spark.blockmanager.port",
199187
DEFAULT_BLOCKMANAGER_PORT.toString)
200188
val driverSubmitter = buildDriverSubmissionClient(kubernetesClient, driverService,
201-
driverSubmitSslOptions)
189+
sslConfiguration)
202190
// Sanity check to see if the driver submitter is even reachable.
203191
driverSubmitter.ping()
204192
logInfo(s"Submitting local resources to driver pod for application " +
@@ -229,20 +217,15 @@ private[spark] class Client(
229217
kubernetesClient: KubernetesClient,
230218
parsedCustomLabels: Map[String, String],
231219
submitServerSecret: Secret,
232-
driverSubmitSslOptions: SSLOptions,
233-
sslSecrets: Array[Secret],
234-
sslVolumes: Array[Volume],
235-
sslVolumeMounts: Array[VolumeMount],
236-
sslEnvs: Array[EnvVar],
237-
isKeyStoreLocalFile: Boolean): (Pod, Service) = {
238-
val endpointsReadyFuture = SettableFuture.create[Endpoints]
239-
val endpointsReadyWatcher = new DriverEndpointsReadyWatcher(endpointsReadyFuture)
240-
val serviceReadyFuture = SettableFuture.create[Service]
220+
sslConfiguration: SslConfiguration): (Pod, Service) = {
241221
val driverKubernetesSelectors = (Map(
242222
SPARK_DRIVER_LABEL -> kubernetesAppId,
243223
SPARK_APP_ID_LABEL -> kubernetesAppId,
244224
SPARK_APP_NAME_LABEL -> appName)
245225
++ parsedCustomLabels).asJava
226+
val endpointsReadyFuture = SettableFuture.create[Endpoints]
227+
val endpointsReadyWatcher = new DriverEndpointsReadyWatcher(endpointsReadyFuture)
228+
val serviceReadyFuture = SettableFuture.create[Service]
246229
val serviceReadyWatcher = new DriverServiceReadyWatcher(serviceReadyFuture)
247230
val podReadyFuture = SettableFuture.create[Pod]
248231
val podWatcher = new DriverPodReadyWatcher(podReadyFuture)
@@ -267,10 +250,7 @@ private[spark] class Client(
267250
kubernetesClient,
268251
driverKubernetesSelectors,
269252
submitServerSecret,
270-
driverSubmitSslOptions,
271-
sslVolumes,
272-
sslVolumeMounts,
273-
sslEnvs)
253+
sslConfiguration)
274254
kubernetesResourceCleaner.registerOrUpdateResource(driverPod)
275255
waitForReadyKubernetesComponents(kubernetesClient, endpointsReadyFuture,
276256
serviceReadyFuture, podReadyFuture)
@@ -386,13 +366,10 @@ private[spark] class Client(
386366
kubernetesClient: KubernetesClient,
387367
driverKubernetesSelectors: util.Map[String, String],
388368
submitServerSecret: Secret,
389-
driverSubmitSslOptions: SSLOptions,
390-
sslVolumes: Array[Volume],
391-
sslVolumeMounts: Array[VolumeMount],
392-
sslEnvs: Array[EnvVar]): Pod = {
369+
sslConfiguration: SslConfiguration): Pod = {
393370
val containerPorts = buildContainerPorts()
394371
val probePingHttpGet = new HTTPGetActionBuilder()
395-
.withScheme(if (driverSubmitSslOptions.enabled) "HTTPS" else "HTTP")
372+
.withScheme(if (sslConfiguration.sslOptions.enabled) "HTTPS" else "HTTP")
396373
.withPath("/v1/submissions/ping")
397374
.withNewPort(SUBMISSION_SERVER_PORT_NAME)
398375
.build()
@@ -409,7 +386,7 @@ private[spark] class Client(
409386
.withSecretName(submitServerSecret.getMetadata.getName)
410387
.endSecret()
411388
.endVolume()
412-
.addToVolumes(sslVolumes: _*)
389+
.addToVolumes(sslConfiguration.sslPodVolumes: _*)
413390
.withServiceAccount(serviceAccount)
414391
.addNewContainer()
415392
.withName(DRIVER_CONTAINER_NAME)
@@ -420,7 +397,7 @@ private[spark] class Client(
420397
.withMountPath(secretDirectory)
421398
.withReadOnly(true)
422399
.endVolumeMount()
423-
.addToVolumeMounts(sslVolumeMounts: _*)
400+
.addToVolumeMounts(sslConfiguration.sslPodVolumeMounts: _*)
424401
.addNewEnv()
425402
.withName(ENV_SUBMISSION_SECRET_LOCATION)
426403
.withValue(s"$secretDirectory/$SUBMISSION_APP_SECRET_NAME")
@@ -429,7 +406,7 @@ private[spark] class Client(
429406
.withName(ENV_SUBMISSION_SERVER_PORT)
430407
.withValue(SUBMISSION_SERVER_PORT.toString)
431408
.endEnv()
432-
.addToEnv(sslEnvs: _*)
409+
.addToEnv(sslConfiguration.sslPodEnvVars: _*)
433410
.withPorts(containerPorts.asJava)
434411
.withNewReadinessProbe().withHttpGet(probePingHttpGet).endReadinessProbe()
435412
.endContainer()
@@ -486,108 +463,6 @@ private[spark] class Client(
486463
}
487464
}
488465

489-
private def parseDriverSubmitSslOptions(): (SSLOptions, Boolean) = {
490-
val maybeKeyStore = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_KEYSTORE)
491-
val resolvedSparkConf = sparkConf.clone()
492-
val (isLocalKeyStore, resolvedKeyStore) = maybeKeyStore.map(keyStore => {
493-
(KubernetesFileUtils.isUriLocalFile(keyStore),
494-
Option.apply(Utils.resolveURI(keyStore).getPath))
495-
}).getOrElse((false, Option.empty[String]))
496-
resolvedKeyStore.foreach {
497-
resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_KEYSTORE, _)
498-
}
499-
sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE).foreach { trustStore =>
500-
if (KubernetesFileUtils.isUriLocalFile(trustStore)) {
501-
resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE,
502-
Utils.resolveURI(trustStore).getPath)
503-
} else {
504-
throw new SparkException(s"Invalid trustStore URI $trustStore; trustStore URI" +
505-
" for submit server must have no scheme, or scheme file://")
506-
}
507-
}
508-
val securityManager = new SecurityManager(resolvedSparkConf)
509-
(securityManager.getSSLOptions(KUBERNETES_SUBMIT_SSL_NAMESPACE), isLocalKeyStore)
510-
}
511-
512-
private def configureSsl(
513-
kubernetesClient: KubernetesClient,
514-
driverSubmitSslOptions: SSLOptions,
515-
isKeyStoreLocalFile: Boolean):
516-
(Array[EnvVar], Array[Volume], Array[VolumeMount], Array[Secret]) = {
517-
if (driverSubmitSslOptions.enabled) {
518-
val sslSecretsMap = mutable.HashMap[String, String]()
519-
val sslEnvs = mutable.Buffer[EnvVar]()
520-
val secrets = mutable.Buffer[Secret]()
521-
driverSubmitSslOptions.keyStore.foreach(store => {
522-
val resolvedKeyStoreFile = if (isKeyStoreLocalFile) {
523-
if (!store.isFile) {
524-
throw new SparkException(s"KeyStore specified at $store is not a file or" +
525-
s" does not exist.")
526-
}
527-
val keyStoreBytes = Files.toByteArray(store)
528-
val keyStoreBase64 = Base64.encodeBase64String(keyStoreBytes)
529-
sslSecretsMap += (SUBMISSION_SSL_KEYSTORE_SECRET_NAME -> keyStoreBase64)
530-
s"$sslSecretsDirectory/$SUBMISSION_SSL_KEYSTORE_SECRET_NAME"
531-
} else {
532-
store.getAbsolutePath
533-
}
534-
sslEnvs += new EnvVarBuilder()
535-
.withName(ENV_SUBMISSION_KEYSTORE_FILE)
536-
.withValue(resolvedKeyStoreFile)
537-
.build()
538-
})
539-
driverSubmitSslOptions.keyStorePassword.foreach(password => {
540-
val passwordBase64 = Base64.encodeBase64String(password.getBytes(Charsets.UTF_8))
541-
sslSecretsMap += (SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME -> passwordBase64)
542-
sslEnvs += new EnvVarBuilder()
543-
.withName(ENV_SUBMISSION_KEYSTORE_PASSWORD_FILE)
544-
.withValue(s"$sslSecretsDirectory/$SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME")
545-
.build()
546-
})
547-
driverSubmitSslOptions.keyPassword.foreach(password => {
548-
val passwordBase64 = Base64.encodeBase64String(password.getBytes(Charsets.UTF_8))
549-
sslSecretsMap += (SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME -> passwordBase64)
550-
sslEnvs += new EnvVarBuilder()
551-
.withName(ENV_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE)
552-
.withValue(s"$sslSecretsDirectory/$SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME")
553-
.build()
554-
})
555-
driverSubmitSslOptions.keyStoreType.foreach(storeType => {
556-
sslEnvs += new EnvVarBuilder()
557-
.withName(ENV_SUBMISSION_KEYSTORE_TYPE)
558-
.withValue(storeType)
559-
.build()
560-
})
561-
sslEnvs += new EnvVarBuilder()
562-
.withName(ENV_SUBMISSION_USE_SSL)
563-
.withValue("true")
564-
.build()
565-
val sslVolume = new VolumeBuilder()
566-
.withName(SUBMISSION_SSL_SECRETS_VOLUME_NAME)
567-
.withNewSecret()
568-
.withSecretName(sslSecretsName)
569-
.endSecret()
570-
.build()
571-
val sslVolumeMount = new VolumeMountBuilder()
572-
.withName(SUBMISSION_SSL_SECRETS_VOLUME_NAME)
573-
.withReadOnly(true)
574-
.withMountPath(sslSecretsDirectory)
575-
.build()
576-
val sslSecrets = kubernetesClient.secrets().createNew()
577-
.withNewMetadata()
578-
.withName(sslSecretsName)
579-
.endMetadata()
580-
.withData(sslSecretsMap.asJava)
581-
.withType("Opaque")
582-
.done()
583-
kubernetesResourceCleaner.registerOrUpdateResource(sslSecrets)
584-
secrets += sslSecrets
585-
(sslEnvs.toArray, Array(sslVolume), Array(sslVolumeMount), secrets.toArray)
586-
} else {
587-
(Array[EnvVar](), Array[Volume](), Array[VolumeMount](), Array[Secret]())
588-
}
589-
}
590-
591466
private def buildSubmitFailedErrorMessage(
592467
kubernetesClient: KubernetesClient,
593468
e: Throwable): String = {
@@ -688,8 +563,8 @@ private[spark] class Client(
688563
private def buildDriverSubmissionClient(
689564
kubernetesClient: KubernetesClient,
690565
service: Service,
691-
driverSubmitSslOptions: SSLOptions): KubernetesSparkRestApi = {
692-
val urlScheme = if (driverSubmitSslOptions.enabled) {
566+
sslConfiguration: SslConfiguration): KubernetesSparkRestApi = {
567+
val urlScheme = if (sslConfiguration.sslOptions.enabled) {
693568
"https"
694569
} else {
695570
logWarning("Submitting application details, application secret, and local" +
@@ -714,45 +589,18 @@ private[spark] class Client(
714589
s"$urlScheme://${address.getAddress}:$servicePort"
715590
}).toSet
716591
require(nodeUrls.nonEmpty, "No nodes found to contact the driver!")
717-
val (trustManager, sslContext): (X509TrustManager, SSLContext) =
718-
if (driverSubmitSslOptions.enabled) {
719-
buildSslConnectionConfiguration(driverSubmitSslOptions)
720-
} else {
721-
(null, SSLContext.getDefault)
722-
}
723592
HttpClientUtil.createClient[KubernetesSparkRestApi](
724593
uris = nodeUrls,
725594
maxRetriesPerServer = 3,
726-
sslSocketFactory = sslContext.getSocketFactory,
727-
trustContext = trustManager,
595+
sslSocketFactory = sslConfiguration
596+
.driverSubmitClientSslContext
597+
.getSocketFactory,
598+
trustContext = sslConfiguration
599+
.driverSubmitClientTrustManager
600+
.orNull,
728601
connectTimeoutMillis = 5000)
729602
}
730603

731-
private def buildSslConnectionConfiguration(driverSubmitSslOptions: SSLOptions) = {
732-
driverSubmitSslOptions.trustStore.map(trustStoreFile => {
733-
val trustManagerFactory = TrustManagerFactory.getInstance(
734-
TrustManagerFactory.getDefaultAlgorithm)
735-
val trustStore = KeyStore.getInstance(
736-
driverSubmitSslOptions.trustStoreType.getOrElse(KeyStore.getDefaultType))
737-
if (!trustStoreFile.isFile) {
738-
throw new SparkException(s"TrustStore file at ${trustStoreFile.getAbsolutePath}" +
739-
s" does not exist or is not a file.")
740-
}
741-
Utils.tryWithResource(new FileInputStream(trustStoreFile)) { trustStoreStream =>
742-
driverSubmitSslOptions.trustStorePassword match {
743-
case Some(password) =>
744-
trustStore.load(trustStoreStream, password.toCharArray)
745-
case None => trustStore.load(trustStoreStream, null)
746-
}
747-
}
748-
trustManagerFactory.init(trustStore)
749-
val trustManagers = trustManagerFactory.getTrustManagers
750-
val sslContext = SSLContext.getInstance("TLSv1.2")
751-
sslContext.init(null, trustManagers, SECURE_RANDOM)
752-
(trustManagers(0).asInstanceOf[X509TrustManager], sslContext)
753-
}).getOrElse((null, SSLContext.getDefault))
754-
}
755-
756604
private def parseCustomLabels(maybeLabels: Option[String]): Map[String, String] = {
757605
maybeLabels.map(labels => {
758606
labels.split(",").map(_.trim).filterNot(_.isEmpty).map(label => {

0 commit comments

Comments
 (0)