Skip to content

Commit cfea300

Browse files
jacek-lewandowskiJoshRosen
authored andcommitted
Spark 3883: SSL support for HttpServer and Akka
SPARK-3883: SSL support for Akka connections and Jetty based file servers. This story introduced the following changes: - Introduced SSLOptions object which holds the SSL configuration and can build the appropriate configuration for Akka or Jetty. SSLOptions can be created by parsing SparkConf entries at a specified namespace. - SSLOptions is created and kept by SecurityManager - All Akka actor address creation snippets based on interpolated strings were replaced by a dedicated methods from AkkaUtils. Those methods select the proper Akka protocol - whether akka.tcp or akka.ssl.tcp - Added tests cases for AkkaUtils, FileServer, SSLOptions and SecurityManager - Added a way to use node local SSL configuration by executors and driver in standalone mode. It can be done by specifying spark.ssl.useNodeLocalConf in SparkConf. - Made CoarseGrainedExecutorBackend not overwrite the settings which are executor startup configuration - they are passed anyway from Worker Refer to #3571 for discussion and details Author: Jacek Lewandowski <lewandowski.jacek@gmail.com> Author: Jacek Lewandowski <jacek.lewandowski@datastax.com> Closes #3571 from jacek-lewandowski/SPARK-3883-master and squashes the following commits: 9ef4ed1 [Jacek Lewandowski] Merge pull request #2 from jacek-lewandowski/SPARK-3883-docs2 fb31b49 [Jacek Lewandowski] SPARK-3883: Added SSL setup documentation 2532668 [Jacek Lewandowski] SPARK-3883: Refactored AkkaUtils.protocol method to not use Try 90a8762 [Jacek Lewandowski] SPARK-3883: Refactored methods to resolve Akka address and made it possible to easily configure multiple communication layers for SSL 72b2541 [Jacek Lewandowski] SPARK-3883: A reference to the fallback SSLOptions can be provided when constructing SSLOptions 93050f4 [Jacek Lewandowski] SPARK-3883: SSL support for HttpServer and Akka
1 parent ef65cf0 commit cfea300

36 files changed

+1145
-73
lines changed

core/src/main/scala/org/apache/spark/HttpServer.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark
1919

2020
import java.io.File
2121

22+
import org.eclipse.jetty.server.ssl.SslSocketConnector
2223
import org.eclipse.jetty.util.security.{Constraint, Password}
2324
import org.eclipse.jetty.security.authentication.DigestAuthenticator
2425
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService}
@@ -72,7 +73,10 @@ private[spark] class HttpServer(
7273
*/
7374
private def doStart(startPort: Int): (Server, Int) = {
7475
val server = new Server()
75-
val connector = new SocketConnector
76+
77+
val connector = securityManager.fileServerSSLOptions.createJettySslContextFactory()
78+
.map(new SslSocketConnector(_)).getOrElse(new SocketConnector)
79+
7680
connector.setMaxIdleTime(60 * 1000)
7781
connector.setSoLingerTime(-1)
7882
connector.setPort(startPort)
@@ -149,13 +153,14 @@ private[spark] class HttpServer(
149153
}
150154

151155
/**
152-
* Get the URI of this HTTP server (http://host:port)
156+
* Get the URI of this HTTP server (http://host:port or https://host:port)
153157
*/
154158
def uri: String = {
155159
if (server == null) {
156160
throw new ServerStateException("Server is not started")
157161
} else {
158-
"http://" + Utils.localIpAddress + ":" + port
162+
val scheme = if (securityManager.fileServerSSLOptions.enabled) "https" else "http"
163+
s"$scheme://${Utils.localIpAddress}:$port"
159164
}
160165
}
161166
}
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
import java.io.File
21+
22+
import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
23+
import org.eclipse.jetty.util.ssl.SslContextFactory
24+
25+
/**
26+
* SSLOptions class is a common container for SSL configuration options. It offers methods to
27+
* generate specific objects to configure SSL for different communication protocols.
28+
*
29+
* SSLOptions is intended to provide the maximum common set of SSL settings, which are supported
30+
* by the protocol, which it can generate the configuration for. Since Akka doesn't support client
31+
* authentication with SSL, SSLOptions cannot support it either.
32+
*
33+
* @param enabled enables or disables SSL; if it is set to false, the rest of the
34+
* settings are disregarded
35+
* @param keyStore a path to the key-store file
36+
* @param keyStorePassword a password to access the key-store file
37+
* @param keyPassword a password to access the private key in the key-store
38+
* @param trustStore a path to the trust-store file
39+
* @param trustStorePassword a password to access the trust-store file
40+
* @param protocol SSL protocol (remember that SSLv3 was compromised) supported by Java
41+
* @param enabledAlgorithms a set of encryption algorithms to use
42+
*/
43+
private[spark] case class SSLOptions(
44+
enabled: Boolean = false,
45+
keyStore: Option[File] = None,
46+
keyStorePassword: Option[String] = None,
47+
keyPassword: Option[String] = None,
48+
trustStore: Option[File] = None,
49+
trustStorePassword: Option[String] = None,
50+
protocol: Option[String] = None,
51+
enabledAlgorithms: Set[String] = Set.empty) {
52+
53+
/**
54+
* Creates a Jetty SSL context factory according to the SSL settings represented by this object.
55+
*/
56+
def createJettySslContextFactory(): Option[SslContextFactory] = {
57+
if (enabled) {
58+
val sslContextFactory = new SslContextFactory()
59+
60+
keyStore.foreach(file => sslContextFactory.setKeyStorePath(file.getAbsolutePath))
61+
trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath))
62+
keyStorePassword.foreach(sslContextFactory.setKeyStorePassword)
63+
trustStorePassword.foreach(sslContextFactory.setTrustStorePassword)
64+
keyPassword.foreach(sslContextFactory.setKeyManagerPassword)
65+
protocol.foreach(sslContextFactory.setProtocol)
66+
sslContextFactory.setIncludeCipherSuites(enabledAlgorithms.toSeq: _*)
67+
68+
Some(sslContextFactory)
69+
} else {
70+
None
71+
}
72+
}
73+
74+
/**
75+
* Creates an Akka configuration object which contains all the SSL settings represented by this
76+
* object. It can be used then to compose the ultimate Akka configuration.
77+
*/
78+
def createAkkaConfig: Option[Config] = {
79+
import scala.collection.JavaConversions._
80+
if (enabled) {
81+
Some(ConfigFactory.empty()
82+
.withValue("akka.remote.netty.tcp.security.key-store",
83+
ConfigValueFactory.fromAnyRef(keyStore.map(_.getAbsolutePath).getOrElse("")))
84+
.withValue("akka.remote.netty.tcp.security.key-store-password",
85+
ConfigValueFactory.fromAnyRef(keyStorePassword.getOrElse("")))
86+
.withValue("akka.remote.netty.tcp.security.trust-store",
87+
ConfigValueFactory.fromAnyRef(trustStore.map(_.getAbsolutePath).getOrElse("")))
88+
.withValue("akka.remote.netty.tcp.security.trust-store-password",
89+
ConfigValueFactory.fromAnyRef(trustStorePassword.getOrElse("")))
90+
.withValue("akka.remote.netty.tcp.security.key-password",
91+
ConfigValueFactory.fromAnyRef(keyPassword.getOrElse("")))
92+
.withValue("akka.remote.netty.tcp.security.random-number-generator",
93+
ConfigValueFactory.fromAnyRef(""))
94+
.withValue("akka.remote.netty.tcp.security.protocol",
95+
ConfigValueFactory.fromAnyRef(protocol.getOrElse("")))
96+
.withValue("akka.remote.netty.tcp.security.enabled-algorithms",
97+
ConfigValueFactory.fromIterable(enabledAlgorithms.toSeq))
98+
.withValue("akka.remote.netty.tcp.enable-ssl",
99+
ConfigValueFactory.fromAnyRef(true)))
100+
} else {
101+
None
102+
}
103+
}
104+
105+
/** Returns a string representation of this SSLOptions with all the passwords masked. */
106+
override def toString: String = s"SSLOptions{enabled=$enabled, " +
107+
s"keyStore=$keyStore, keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " +
108+
s"trustStore=$trustStore, trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " +
109+
s"protocol=$protocol, enabledAlgorithms=$enabledAlgorithms}"
110+
111+
}
112+
113+
private[spark] object SSLOptions extends Logging {
114+
115+
/** Resolves SSLOptions settings from a given Spark configuration object at a given namespace.
116+
*
117+
* The following settings are allowed:
118+
* $ - `[ns].enabled` - `true` or `false`, to enable or disable SSL respectively
119+
* $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory
120+
* $ - `[ns].keyStorePassword` - a password to the key-store file
121+
* $ - `[ns].keyPassword` - a password to the private key
122+
* $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current
123+
* directory
124+
* $ - `[ns].trustStorePassword` - a password to the trust-store file
125+
* $ - `[ns].protocol` - a protocol name supported by a particular Java version
126+
* $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers
127+
*
128+
* For a list of protocols and ciphers supported by particular Java versions, you may go to
129+
* [[https://blogs.oracle.com/java-platform-group/entry/diagnosing_tls_ssl_and_https Oracle
130+
* blog page]].
131+
*
132+
* You can optionally specify the default configuration. If you do, for each setting which is
133+
* missing in SparkConf, the corresponding setting is used from the default configuration.
134+
*
135+
* @param conf Spark configuration object where the settings are collected from
136+
* @param ns the namespace name
137+
* @param defaults the default configuration
138+
* @return [[org.apache.spark.SSLOptions]] object
139+
*/
140+
def parse(conf: SparkConf, ns: String, defaults: Option[SSLOptions] = None): SSLOptions = {
141+
val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = defaults.exists(_.enabled))
142+
143+
val keyStore = conf.getOption(s"$ns.keyStore").map(new File(_))
144+
.orElse(defaults.flatMap(_.keyStore))
145+
146+
val keyStorePassword = conf.getOption(s"$ns.keyStorePassword")
147+
.orElse(defaults.flatMap(_.keyStorePassword))
148+
149+
val keyPassword = conf.getOption(s"$ns.keyPassword")
150+
.orElse(defaults.flatMap(_.keyPassword))
151+
152+
val trustStore = conf.getOption(s"$ns.trustStore").map(new File(_))
153+
.orElse(defaults.flatMap(_.trustStore))
154+
155+
val trustStorePassword = conf.getOption(s"$ns.trustStorePassword")
156+
.orElse(defaults.flatMap(_.trustStorePassword))
157+
158+
val protocol = conf.getOption(s"$ns.protocol")
159+
.orElse(defaults.flatMap(_.protocol))
160+
161+
val enabledAlgorithms = conf.getOption(s"$ns.enabledAlgorithms")
162+
.map(_.split(",").map(_.trim).filter(_.nonEmpty).toSet)
163+
.orElse(defaults.map(_.enabledAlgorithms))
164+
.getOrElse(Set.empty)
165+
166+
new SSLOptions(
167+
enabled,
168+
keyStore,
169+
keyStorePassword,
170+
keyPassword,
171+
trustStore,
172+
trustStorePassword,
173+
protocol,
174+
enabledAlgorithms)
175+
}
176+
177+
}
178+

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 94 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@
1818
package org.apache.spark
1919

2020
import java.net.{Authenticator, PasswordAuthentication}
21+
import java.security.KeyStore
22+
import java.security.cert.X509Certificate
23+
import javax.net.ssl._
2124

25+
import com.google.common.io.Files
2226
import org.apache.hadoop.io.Text
2327

2428
import org.apache.spark.deploy.SparkHadoopUtil
@@ -55,7 +59,7 @@ import org.apache.spark.network.sasl.SecretKeyHolder
5559
* Spark also has a set of admin acls (`spark.admin.acls`) which is a set of users/administrators
5660
* who always have permission to view or modify the Spark application.
5761
*
58-
* Spark does not currently support encryption after authentication.
62+
* Starting from version 1.3, Spark has partial support for encrypted connections with SSL.
5963
*
6064
* At this point spark has multiple communication protocols that need to be secured and
6165
* different underlying mechanisms are used depending on the protocol:
@@ -67,8 +71,9 @@ import org.apache.spark.network.sasl.SecretKeyHolder
6771
* to connect to the server. There is no control of the underlying
6872
* authentication mechanism so its not clear if the password is passed in
6973
* plaintext or uses DIGEST-MD5 or some other mechanism.
70-
* Akka also has an option to turn on SSL, this option is not currently supported
71-
* but we could add a configuration option in the future.
74+
*
75+
* Akka also has an option to turn on SSL, this option is currently supported (see
76+
* the details below).
7277
*
7378
* - HTTP for broadcast and file server (via HttpServer) -> Spark currently uses Jetty
7479
* for the HttpServer. Jetty supports multiple authentication mechanisms -
@@ -77,8 +82,9 @@ import org.apache.spark.network.sasl.SecretKeyHolder
7782
* to authenticate using DIGEST-MD5 via a single user and the shared secret.
7883
* Since we are using DIGEST-MD5, the shared secret is not passed on the wire
7984
* in plaintext.
80-
* We currently do not support SSL (https), but Jetty can be configured to use it
81-
* so we could add a configuration option for this in the future.
85+
*
86+
* We currently support SSL (https) for this communication protocol (see the details
87+
* below).
8288
*
8389
* The Spark HttpServer installs the HashLoginServer and configures it to DIGEST-MD5.
8490
* Any clients must specify the user and password. There is a default
@@ -142,9 +148,40 @@ import org.apache.spark.network.sasl.SecretKeyHolder
142148
* authentication. Spark will then use that user to compare against the view acls to do
143149
* authorization. If not filter is in place the user is generally null and no authorization
144150
* can take place.
151+
*
152+
* Connection encryption (SSL) configuration is organized hierarchically. The user can configure
153+
* the default SSL settings which will be used for all the supported communication protocols unless
154+
* they are overwritten by protocol specific settings. This way the user can easily provide the
155+
* common settings for all the protocols without disabling the ability to configure each one
156+
* individually.
157+
*
158+
* All the SSL settings like `spark.ssl.xxx` where `xxx` is a particular configuration property,
159+
* denote the global configuration for all the supported protocols. In order to override the global
160+
* configuration for the particular protocol, the properties must be overwritten in the
161+
* protocol-specific namespace. Use `spark.ssl.yyy.xxx` settings to overwrite the global
162+
* configuration for particular protocol denoted by `yyy`. Currently `yyy` can be either `akka` for
163+
* Akka based connections or `fs` for broadcast and file server.
164+
*
165+
* Refer to [[org.apache.spark.SSLOptions]] documentation for the list of
166+
* options that can be specified.
167+
*
168+
* SecurityManager initializes SSLOptions objects for different protocols separately. SSLOptions
169+
* object parses Spark configuration at a given namespace and builds the common representation
170+
* of SSL settings. SSLOptions is then used to provide protocol-specific configuration like
171+
* TypeSafe configuration for Akka or SSLContextFactory for Jetty.
172+
*
173+
* SSL must be configured on each node and configured for each component involved in
174+
* communication using the particular protocol. In YARN clusters, the key-store can be prepared on
175+
* the client side then distributed and used by the executors as the part of the application
176+
* (YARN allows the user to deploy files before the application is started).
177+
* In standalone deployment, the user needs to provide key-stores and configuration
178+
* options for master and workers. In this mode, the user may allow the executors to use the SSL
179+
* settings inherited from the worker which spawned that executor. It can be accomplished by
180+
* setting `spark.ssl.useNodeLocalConf` to `true`.
145181
*/
146182

147-
private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with SecretKeyHolder {
183+
private[spark] class SecurityManager(sparkConf: SparkConf)
184+
extends Logging with SecretKeyHolder {
148185

149186
// key used to store the spark secret in the Hadoop UGI
150187
private val sparkSecretLookupKey = "sparkCookie"
@@ -196,6 +233,57 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with
196233
)
197234
}
198235

236+
// the default SSL configuration - it will be used by all communication layers unless overwritten
237+
private val defaultSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl", defaults = None)
238+
239+
// SSL configuration for different communication layers - they can override the default
240+
// configuration at a specified namespace. The namespace *must* start with spark.ssl.
241+
val fileServerSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.fs", Some(defaultSSLOptions))
242+
val akkaSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.akka", Some(defaultSSLOptions))
243+
244+
logDebug(s"SSLConfiguration for file server: $fileServerSSLOptions")
245+
logDebug(s"SSLConfiguration for Akka: $akkaSSLOptions")
246+
247+
val (sslSocketFactory, hostnameVerifier) = if (fileServerSSLOptions.enabled) {
248+
val trustStoreManagers =
249+
for (trustStore <- fileServerSSLOptions.trustStore) yield {
250+
val input = Files.asByteSource(fileServerSSLOptions.trustStore.get).openStream()
251+
252+
try {
253+
val ks = KeyStore.getInstance(KeyStore.getDefaultType)
254+
ks.load(input, fileServerSSLOptions.trustStorePassword.get.toCharArray)
255+
256+
val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
257+
tmf.init(ks)
258+
tmf.getTrustManagers
259+
} finally {
260+
input.close()
261+
}
262+
}
263+
264+
lazy val credulousTrustStoreManagers = Array({
265+
logWarning("Using 'accept-all' trust manager for SSL connections.")
266+
new X509TrustManager {
267+
override def getAcceptedIssuers: Array[X509Certificate] = null
268+
269+
override def checkClientTrusted(x509Certificates: Array[X509Certificate], s: String) {}
270+
271+
override def checkServerTrusted(x509Certificates: Array[X509Certificate], s: String) {}
272+
}: TrustManager
273+
})
274+
275+
val sslContext = SSLContext.getInstance(fileServerSSLOptions.protocol.getOrElse("Default"))
276+
sslContext.init(null, trustStoreManagers.getOrElse(credulousTrustStoreManagers), null)
277+
278+
val hostVerifier = new HostnameVerifier {
279+
override def verify(s: String, sslSession: SSLSession): Boolean = true
280+
}
281+
282+
(Some(sslContext.getSocketFactory), Some(hostVerifier))
283+
} else {
284+
(None, None)
285+
}
286+
199287
/**
200288
* Split a comma separated String, filter out any empty items, and return a Set of strings
201289
*/

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,7 @@ private[spark] object SparkConf {
370370
isAkkaConf(name) ||
371371
name.startsWith("spark.akka") ||
372372
name.startsWith("spark.auth") ||
373+
name.startsWith("spark.ssl") ||
373374
isSparkPortConf(name)
374375
}
375376

core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ private[broadcast] object HttpBroadcast extends Logging {
199199
uc = new URL(url).openConnection()
200200
uc.setConnectTimeout(httpReadTimeout)
201201
}
202+
Utils.setupSecureURLConnection(uc, securityManager)
202203

203204
val in = {
204205
uc.setReadTimeout(httpReadTimeout)

core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,14 @@ private[spark] class ApplicationDescription(
2828

2929
val user = System.getProperty("user.name", "<unknown>")
3030

31+
def copy(
32+
name: String = name,
33+
maxCores: Option[Int] = maxCores,
34+
memoryPerSlave: Int = memoryPerSlave,
35+
command: Command = command,
36+
appUiUrl: String = appUiUrl,
37+
eventLogDir: Option[String] = eventLogDir): ApplicationDescription =
38+
new ApplicationDescription(name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir)
39+
3140
override def toString: String = "ApplicationDescription(" + name + ")"
3241
}

0 commit comments

Comments
 (0)