-
Notifications
You must be signed in to change notification settings - Fork 28.6k
SPARK-3883 SSL support for HttpServer and Akka #2739
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
fcc3d29
bc1b6a8
77591cb
f24d854
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,3 +57,4 @@ dist/* | |
.*iws | ||
logs | ||
.*scalastyle-output.xml | ||
ssl.conf.template |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
# Spark SSL settings | ||
|
||
# ssl.enabled true | ||
# ssl.keyStore /path/to/your/keyStore | ||
# ssl.keyStorePassword password | ||
# ssl.keyPassword password | ||
# ssl.trustStore /path/to/your/trustStore | ||
# ssl.trustStorePassword password | ||
# ssl.enabledAlgorithms [TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA] | ||
# ssl.protocol TLSv1.2 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark | ||
|
||
import java.io.{FileReader, File} | ||
import java.util.Properties | ||
|
||
import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} | ||
import org.eclipse.jetty.util.ssl.SslContextFactory | ||
|
||
import scala.util.Try | ||
|
||
case class SSLOptions(enabled: Boolean = false, | ||
keyStore: Option[File] = None, | ||
keyStorePassword: Option[String] = None, | ||
keyPassword: Option[String] = None, | ||
trustStore: Option[File] = None, | ||
trustStorePassword: Option[String] = None, | ||
protocol: Option[String] = None, | ||
enabledAlgorithms: Set[String] = Set.empty) { | ||
|
||
/** | ||
* Creates a Jetty SSL context factory according to the SSL settings represented by this object. | ||
*/ | ||
def createJettySslContextFactory(): Option[SslContextFactory] = { | ||
if (enabled) { | ||
val sslContextFactory = new SslContextFactory() | ||
|
||
keyStore.foreach(file => sslContextFactory.setKeyStorePath(file.getAbsolutePath)) | ||
trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath)) | ||
keyStorePassword.foreach(sslContextFactory.setKeyStorePassword) | ||
trustStorePassword.foreach(sslContextFactory.setTrustStorePassword) | ||
keyPassword.foreach(sslContextFactory.setKeyManagerPassword) | ||
protocol.foreach(sslContextFactory.setProtocol) | ||
sslContextFactory.setIncludeCipherSuites(enabledAlgorithms.toSeq: _*) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are a few options in SslContextFactory that are not covered here, such as:
How hard would it be to have some sort of bypass from the configuration so that those who want to try these options can? I've done that in the past using BeanUtils, it's a little ugly but it mostly works. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The set of options is the intersection of the options supported by SslContextFactory and Akka transport. Setting other options, such as the options mentioned by you, would be inconsistent and would make the users feel they enabled them for Spark, not only for Jetty based connections. If this is that important, I can add these options. Nevertheless, I'd display warning message when they are explicitly set. |
||
Some(sslContextFactory) | ||
} else { | ||
None | ||
} | ||
} | ||
|
||
/** | ||
* Creates an Akka configuration object which contains all the SSL settings represented by this | ||
* object. It can be used then to compose the ultimate Akka configuration. | ||
*/ | ||
def createAkkaConfig: Option[Config] = { | ||
import scala.collection.JavaConversions._ | ||
if (enabled) { | ||
Some(ConfigFactory.empty() | ||
.withValue("akka.remote.netty.tcp.security.key-store", | ||
ConfigValueFactory.fromAnyRef(keyStore.map(_.getAbsolutePath).getOrElse(""))) | ||
.withValue("akka.remote.netty.tcp.security.key-store-password", | ||
ConfigValueFactory.fromAnyRef(keyStorePassword.getOrElse(""))) | ||
.withValue("akka.remote.netty.tcp.security.trust-store", | ||
ConfigValueFactory.fromAnyRef(trustStore.map(_.getAbsolutePath).getOrElse(""))) | ||
.withValue("akka.remote.netty.tcp.security.trust-store-password", | ||
ConfigValueFactory.fromAnyRef(trustStorePassword.getOrElse(""))) | ||
.withValue("akka.remote.netty.tcp.security.key-password", | ||
ConfigValueFactory.fromAnyRef(keyPassword.getOrElse(""))) | ||
.withValue("akka.remote.netty.tcp.security.random-number-generator", | ||
ConfigValueFactory.fromAnyRef("")) | ||
.withValue("akka.remote.netty.tcp.security.protocol", | ||
ConfigValueFactory.fromAnyRef(protocol.getOrElse(""))) | ||
.withValue("akka.remote.netty.tcp.security.enabled-algorithms", | ||
ConfigValueFactory.fromIterable(enabledAlgorithms.toSeq)) | ||
.withValue("akka.remote.netty.tcp.enable-ssl", | ||
ConfigValueFactory.fromAnyRef(true))) | ||
} else { | ||
None | ||
} | ||
} | ||
|
||
} | ||
|
||
object SSLOptions extends Logging { | ||
|
||
/** | ||
* Resolves the SSL configuration file location by checking: | ||
* - SPARK_SSL_CONFIG_FILE env variable | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not a fan of using env variables. We should be trying to discourage their use, not add new ones. I'd suggest instead an approach based on SparkConf. Have a "spark.ssl.config" option, and if it's not set, load it with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Having the path to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. Still, by using this env variable, you're assuming the hosts where the executors will run actually have that file available. I don't think we should make that assumption. Instead, I'd prefer an approach such as this: https://github.com/apache/spark/blob/master/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala#L69 Basically, any options needed for the executor to connect back to the driver should be passed on the executor's command line. It also fits well into the "everything should go into SparkConf" model (which requires config options to start with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah. Another thing. I haven't looked at your code to see if it's handled, but akka requires executors to open a listening socket, all the related files (such as the trust store) also need to somehow be sent to the executors... it would kinda suck to force everybody to copy the files manually to all possible places where executors may run. In Yarn mode that should be easy (using the distributed cache), but no idea whether that's easy or even possible in standalone or mesos... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually executor hosts have to be configured to work with SSL. Admin needs to create keystores and truststores, so why wouldn't he generate ssl config file? For the other comment - the only way I can see to make it possible to put these settings into Spark config is to add them as -Dspark.xxx=blablabla strings to the Still, I cannot see the reason for doing it this way. Remember that SSL works for all Akka communication, also between Driver, Workers and Master. So, just to run workers you need to provide the SSL configuration on each host which you want to use, because they are unable to connect to Master otherwise. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Am I right saying that you understand worker = executor? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What you refer to as "workers" is generally referred to as "executors" inside Spark, to avoid confusion with the "Worker" daemons in the standalone cluster manager. Still, that doesn't change anything I've written so far. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem is that in standalone mode, Spark Master + Spark Workers play a role of the cluster manager. Launcher cannot talk to them without setting up SSL connection. Moreover, Spark Master cannot talk to Spark Workers without setting up SSL connection. I think that we are talking about two different things :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So backtrack a bit. I think I might be seeing what you mean. There are two different things:
Other than enabling SSL, I don't see why both need to share the same configuration. You can still use different trust stores and certs and even certain different configs, as long as things end up matching (e.g. if the Master is set up to validate certs, launchers will need a certificate recognized by the Master when trying to launch jobs - but not necessarily the same certificate).
That's what I've been talking about. I think it's not reasonable to require the SSL configuration to be manually distributed before you can launch jobs, for the reasons I already pointed out. You may go down the path of having different Akka configs to talk to the Master and for the driver <-> executor communication, but that would require multiple actor systems in the same VM and it would probably require changes in other places where it's assumed both use the same actor system. So, that being said, do you understand what I'm getting at? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @vanzin I understand what you mean. I just thought that you didn't understand me :) To be more precise, there is one more type of communication: For this one, you need SSL configured on all the nodes prior to running any application. My understanding of SSL is that it is not intended to authenticate particular clients (applications) but rather authenticating hosts. In this regard, a single SSL configuration per host is enough. At the end, we want to encrypt the data which are exchanged and a single SSL configuration per host does the job. We can introduce further improvements later, though. |
||
* - SPARK_CONF_DIR/ssl.conf | ||
* - SPARK_HOME/conf/ssl.conf | ||
*/ | ||
def defaultConfigFile: Option[File] = { | ||
val specifiedFile = Option(System.getenv("SPARK_SSL_CONFIG_FILE")).map(new File(_)) | ||
val sparkConfDir = Option(System.getenv("SPARK_CONF_DIR")).map(new File(_)) | ||
val sparkHomeConfDir = Option(System.getenv("SPARK_HOME")) | ||
.map(new File(_, "conf")) | ||
val defaultFile = (sparkConfDir orElse sparkHomeConfDir).map(new File(_, "ssl.conf")) | ||
|
||
specifiedFile orElse defaultFile | ||
} | ||
|
||
/** | ||
* Loads the given properties file with failover to empty Properties object. | ||
*/ | ||
def load(configFile: File): Properties = { | ||
logInfo(s"Loading SSL configuration from $configFile") | ||
try { | ||
val props = new Properties() | ||
val reader = new FileReader(configFile) | ||
try { | ||
props.load(reader) | ||
props.put("sslConfigurationFileLocation", configFile.getAbsolutePath) | ||
props | ||
} finally { | ||
reader.close() | ||
} | ||
} catch { | ||
case ex: Throwable => | ||
logWarning(s"The SSL configuration file ${configFile.getAbsolutePath} " + | ||
s"could not be loaded. The underlying exception was: ${ex.getMessage}") | ||
new Properties | ||
} | ||
} | ||
|
||
/** | ||
* Resolves SSLOptions settings from a given Spark configuration object at a given namespace. | ||
* If SSL settings were loaded from the configuration file, ``sslConfigurationFileLocation`` | ||
* property is present in the Spark configuration. The parent directory of that location is used | ||
* as a base directory to resolve relative paths to keystore and truststore. | ||
*/ | ||
def parse(conf: SparkConf, ns: String): SSLOptions = { | ||
val parentDir = conf.getOption("sslConfigurationFileLocation").map(new File(_).getParentFile) | ||
.getOrElse(new File(".")).toPath | ||
|
||
val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = false) | ||
val keyStore = Try(conf.get(s"$ns.keyStore")).toOption.map(parentDir.resolve(_).toFile) | ||
val keyStorePassword = Try(conf.get(s"$ns.keyStorePassword")).toOption | ||
val keyPassword = Try(conf.get(s"$ns.keyPassword")).toOption | ||
val trustStore = Try(conf.get(s"$ns.trustStore")).toOption.map(parentDir.resolve(_).toFile) | ||
val trustStorePassword = Try(conf.get(s"$ns.trustStorePassword")).toOption | ||
val protocol = Try(conf.get(s"$ns.protocol")).toOption | ||
val enabledAlgorithms = Try(conf.get(s"$ns.enabledAlgorithms")).toOption | ||
.map(_.trim.dropWhile(_ == '[') | ||
.takeWhile(_ != ']')).map(_.split(",").map(_.trim).toSet) | ||
.getOrElse(Set.empty) | ||
|
||
new SSLOptions(enabled, keyStore, keyStorePassword, keyPassword, trustStore, trustStorePassword, | ||
protocol, enabledAlgorithms) | ||
} | ||
|
||
/** | ||
* Loads the SSL configuration file. If ``spark.ssl.configFile`` property is in the system | ||
* properties, it is assumed it contains the SSL configuration file location to be used. | ||
* Otherwise, it uses the location returned by [[SSLOptions.defaultConfigFile]]. | ||
*/ | ||
def load(): Properties = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm a little confused about all the different ways to load configuration here. Why do you need more than one way? Why isn't everything configured in the already existing Spark configuration file that is processed by spark-submit and friends? If you need different SSL configurations for different subsystems, you can do that using namespaces (as I think is what you were aiming at by the method at L140), but I don't see the point of having 2 or 3 or more methods just dealing with loading this configuration in different ways. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a one method of loading the configuration. If you mean the config location resolving sequence - I think it is quite obvious - firstly it take the system property, then env variable, then config directory and then home/conf directory. For the namespaces - yes, my aim was to make it possible to store multiple SSL configurations in a single config file, under different namespaces. In particular, different settings for data, control messages, web UI. |
||
val file = Option(System.getProperty("spark.ssl.configFile")) | ||
.map(new File(_)) orElse defaultConfigFile | ||
|
||
file.fold { | ||
logWarning("SSL configuration file not found. SSL will be disabled.") | ||
new Properties() | ||
} { file => | ||
logInfo(s"Loading SSL configuration from ${file.getAbsolutePath}") | ||
load(file) | ||
} | ||
} | ||
|
||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,11 @@ | |
package org.apache.spark | ||
|
||
import java.net.{Authenticator, PasswordAuthentication} | ||
import java.security.KeyStore | ||
import java.security.cert.X509Certificate | ||
import javax.net.ssl._ | ||
|
||
import com.google.common.io.Files | ||
import org.apache.hadoop.io.Text | ||
|
||
import org.apache.spark.deploy.SparkHadoopUtil | ||
|
@@ -192,6 +196,43 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging { | |
) | ||
} | ||
|
||
private[spark] val sslOptions = SSLOptions.parse(sparkConf, "ssl") | ||
|
||
private[spark] val (sslSocketFactory, hostnameVerifier) = if (sslOptions.enabled) { | ||
val trustStoreManagers = | ||
for (trustStore <- sslOptions.trustStore) yield { | ||
val ks = KeyStore.getInstance(KeyStore.getDefaultType) | ||
ks.load(Files.asByteSource(sslOptions.trustStore.get).openStream(), | ||
sslOptions.trustStorePassword.get.toCharArray) | ||
|
||
val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm) | ||
tmf.init(ks) | ||
tmf.getTrustManagers | ||
} | ||
|
||
lazy val credulousTrustStoreManagers = Array({ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why lazy if you're using it in the next few lines anyway? Also, shouldn't this be keyed off some option? Some people won't want to disable cert verification. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because it is conditionally evaluated - in the case when |
||
logWarning("Using 'accept-all' trust manager for SSL connections.") | ||
new X509TrustManager { | ||
override def getAcceptedIssuers: Array[X509Certificate] = null | ||
|
||
override def checkClientTrusted(x509Certificates: Array[X509Certificate], s: String) {} | ||
|
||
override def checkServerTrusted(x509Certificates: Array[X509Certificate], s: String) {} | ||
}: TrustManager | ||
}) | ||
|
||
val sslContext = SSLContext.getInstance(sslOptions.protocol.getOrElse("Default")) | ||
sslContext.init(null, trustStoreManagers getOrElse credulousTrustStoreManagers, null) | ||
|
||
val hostVerifier = new HostnameVerifier { | ||
override def verify(s: String, sslSession: SSLSession): Boolean = true | ||
} | ||
|
||
(Some(sslContext.getSocketFactory), Some(hostVerifier)) | ||
} else { | ||
(None, None) | ||
} | ||
|
||
/** | ||
* Split a comma separated String, filter out any empty items, and return a Set of strings | ||
*/ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be a list? I'm pretty sure it's allowed to have multiple protocols enabled.
Also, given POODLE, it's probably a good idea to expose the setting to disable specific protocols, and have it include "SSLv3" by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK, you can select a single protocol, however you can provide multiple encryption algorithms.