Skip to content

SPARK-3883 SSL support for HttpServer and Akka #2739

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,4 @@ dist/*
.*iws
logs
.*scalastyle-output.xml
ssl.conf.template
10 changes: 10 additions & 0 deletions conf/ssl.conf.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Spark SSL settings

# ssl.enabled true
# ssl.keyStore /path/to/your/keyStore
# ssl.keyStorePassword password
# ssl.keyPassword password
# ssl.trustStore /path/to/your/trustStore
# ssl.trustStorePassword password
# ssl.enabledAlgorithms [TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA]
# ssl.protocol TLSv1.2
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/HttpFileServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import org.apache.spark.util.Utils

private[spark] class HttpFileServer(
securityManager: SecurityManager,
requestedPort: Int = 0)
requestedPort: Int = 0,
conf: SparkConf)
extends Logging {

var baseDir : File = null
Expand All @@ -41,7 +42,7 @@ private[spark] class HttpFileServer(
fileDir.mkdir()
jarDir.mkdir()
logInfo("HTTP File server directory is " + baseDir)
httpServer = new HttpServer(baseDir, securityManager, requestedPort, "HTTP file server")
httpServer = new HttpServer(baseDir, securityManager, requestedPort, "HTTP file server", conf)
httpServer.start()
serverUri = httpServer.uri
logDebug("HTTP file server started at: " + serverUri)
Expand Down
14 changes: 10 additions & 4 deletions core/src/main/scala/org/apache/spark/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark

import java.io.File

import org.eclipse.jetty.server.ssl.SslSocketConnector
import org.eclipse.jetty.util.security.{Constraint, Password}
import org.eclipse.jetty.security.authentication.DigestAuthenticator
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService}
Expand All @@ -45,7 +46,8 @@ private[spark] class HttpServer(
resourceBase: File,
securityManager: SecurityManager,
requestedPort: Int = 0,
serverName: String = "HTTP server")
serverName: String = "HTTP server",
conf: SparkConf)
extends Logging {

private var server: Server = null
Expand All @@ -71,7 +73,10 @@ private[spark] class HttpServer(
*/
private def doStart(startPort: Int): (Server, Int) = {
val server = new Server()
val connector = new SocketConnector

val connector = securityManager.sslOptions.createJettySslContextFactory()
.map(new SslSocketConnector(_)).getOrElse(new SocketConnector)

connector.setMaxIdleTime(60 * 1000)
connector.setSoLingerTime(-1)
connector.setPort(startPort)
Expand Down Expand Up @@ -148,13 +153,14 @@ private[spark] class HttpServer(
}

/**
* Get the URI of this HTTP server (http://host:port)
* Get the URI of this HTTP server (http://host:port or https://host:port)
*/
def uri: String = {
if (server == null) {
throw new ServerStateException("Server is not started")
} else {
"http://" + Utils.localIpAddress + ":" + port
val scheme = if (securityManager.sslOptions.enabled) "https" else "http"
s"$scheme://${Utils.localIpAddress}:$port"
}
}
}
177 changes: 177 additions & 0 deletions core/src/main/scala/org/apache/spark/SSLOptions.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import java.io.{FileReader, File}
import java.util.Properties

import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
import org.eclipse.jetty.util.ssl.SslContextFactory

import scala.util.Try

case class SSLOptions(enabled: Boolean = false,
keyStore: Option[File] = None,
keyStorePassword: Option[String] = None,
keyPassword: Option[String] = None,
trustStore: Option[File] = None,
trustStorePassword: Option[String] = None,
protocol: Option[String] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be a list? I'm pretty sure it's allowed to have multiple protocols enabled.

Also, given POODLE, it's probably a good idea to expose the setting to disable specific protocols, and have it include "SSLv3" by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, you can select a single protocol, however you can provide multiple encryption algorithms.

enabledAlgorithms: Set[String] = Set.empty) {

/**
* Creates a Jetty SSL context factory according to the SSL settings represented by this object.
*/
def createJettySslContextFactory(): Option[SslContextFactory] = {
if (enabled) {
val sslContextFactory = new SslContextFactory()

keyStore.foreach(file => sslContextFactory.setKeyStorePath(file.getAbsolutePath))
trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath))
keyStorePassword.foreach(sslContextFactory.setKeyStorePassword)
trustStorePassword.foreach(sslContextFactory.setTrustStorePassword)
keyPassword.foreach(sslContextFactory.setKeyManagerPassword)
protocol.foreach(sslContextFactory.setProtocol)
sslContextFactory.setIncludeCipherSuites(enabledAlgorithms.toSeq: _*)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few options in SslContextFactory that are not covered here, such as:

  • setAllowRenegotiate
  • setNeedClientAuth
  • setSslSessionCacheSize
  • setSslSessionTimeout
  • setTrustAll
  • setValidateCerts
  • setValidatePeerCerts
  • setWantClientAuth

How hard would it be to have some sort of bypass from the configuration so that those who want to try these options can? I've done that in the past using BeanUtils, it's a little ugly but it mostly works.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The set of options is the intersection of the options supported by SslContextFactory and Akka transport.

Setting other options, such as the options mentioned by you, would be inconsistent and would make the users feel they enabled them for Spark, not only for Jetty based connections. If this is that important, I can add these options. Nevertheless, I'd display warning message when they are explicitly set.

Some(sslContextFactory)
} else {
None
}
}

/**
* Creates an Akka configuration object which contains all the SSL settings represented by this
* object. It can be used then to compose the ultimate Akka configuration.
*/
def createAkkaConfig: Option[Config] = {
import scala.collection.JavaConversions._
if (enabled) {
Some(ConfigFactory.empty()
.withValue("akka.remote.netty.tcp.security.key-store",
ConfigValueFactory.fromAnyRef(keyStore.map(_.getAbsolutePath).getOrElse("")))
.withValue("akka.remote.netty.tcp.security.key-store-password",
ConfigValueFactory.fromAnyRef(keyStorePassword.getOrElse("")))
.withValue("akka.remote.netty.tcp.security.trust-store",
ConfigValueFactory.fromAnyRef(trustStore.map(_.getAbsolutePath).getOrElse("")))
.withValue("akka.remote.netty.tcp.security.trust-store-password",
ConfigValueFactory.fromAnyRef(trustStorePassword.getOrElse("")))
.withValue("akka.remote.netty.tcp.security.key-password",
ConfigValueFactory.fromAnyRef(keyPassword.getOrElse("")))
.withValue("akka.remote.netty.tcp.security.random-number-generator",
ConfigValueFactory.fromAnyRef(""))
.withValue("akka.remote.netty.tcp.security.protocol",
ConfigValueFactory.fromAnyRef(protocol.getOrElse("")))
.withValue("akka.remote.netty.tcp.security.enabled-algorithms",
ConfigValueFactory.fromIterable(enabledAlgorithms.toSeq))
.withValue("akka.remote.netty.tcp.enable-ssl",
ConfigValueFactory.fromAnyRef(true)))
} else {
None
}
}

}

object SSLOptions extends Logging {

/**
* Resolves the SSL configuration file location by checking:
* - SPARK_SSL_CONFIG_FILE env variable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan of using env variables. We should be trying to discourage their use, not add new ones. I'd suggest instead an approach based on SparkConf. Have a "spark.ssl.config" option, and if it's not set, load it with SSLOptions.getClass.getResourceAsStream("/ssl.conf") (the conf dir is added to the classpath by the scripts in bin/).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having the path to ssl.conf file in SparkConf is pointless, because the executor needs to use the SSL configuration prior to receiving the configuration. This is entirely host-based setting, not the application based setting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Still, by using this env variable, you're assuming the hosts where the executors will run actually have that file available. I don't think we should make that assumption.

Instead, I'd prefer an approach such as this: https://github.com/apache/spark/blob/master/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala#L69

Basically, any options needed for the executor to connect back to the driver should be passed on the executor's command line. It also fits well into the "everything should go into SparkConf" model (which requires config options to start with spark., at least when you want system properties to be loaded automatically).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. Another thing. I haven't looked at your code to see if it's handled, but akka requires executors to open a listening socket, all the related files (such as the trust store) also need to somehow be sent to the executors... it would kinda suck to force everybody to copy the files manually to all possible places where executors may run.

In Yarn mode that should be easy (using the distributed cache), but no idea whether that's easy or even possible in standalone or mesos...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually executor hosts have to be configured to work with SSL. Admin needs to create keystores and truststores, so why wouldn't he generate ssl config file?

For the other comment - the only way I can see to make it possible to put these settings into Spark config is to add them as -Dspark.xxx=blablabla strings to the spark.executor.extraJavaOptions config option. It is resolved and used before an executor is spawned by a worker.

Still, I cannot see the reason for doing it this way. Remember that SSL works for all Akka communication, also between Driver, Workers and Master. So, just to run workers you need to provide the SSL configuration on each host which you want to use, because they are unable to connect to Master otherwise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I right saying that you understand worker = executor?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you refer to as "workers" is generally referred to as "executors" inside Spark, to avoid confusion with the "Worker" daemons in the standalone cluster manager. Still, that doesn't change anything I've written so far.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that in standalone mode, Spark Master + Spark Workers play a role of the cluster manager. Launcher cannot talk to them without setting up SSL connection. Moreover, Spark Master cannot talk to Spark Workers without setting up SSL connection.

I think that we are talking about two different things :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So backtrack a bit. I think I might be seeing what you mean. There are two different things:

  • Launcher talking to Spark Master

Other than enabling SSL, I don't see why both need to share the same configuration. You can still use different trust stores and certs and even certain different configs, as long as things end up matching (e.g. if the Master is set up to validate certs, launchers will need a certificate recognized by the Master when trying to launch jobs - but not necessarily the same certificate).

  • Driver talking to Executors

That's what I've been talking about. I think it's not reasonable to require the SSL configuration to be manually distributed before you can launch jobs, for the reasons I already pointed out.

You may go down the path of having different Akka configs to talk to the Master and for the driver <-> executor communication, but that would require multiple actor systems in the same VM and it would probably require changes in other places where it's assumed both use the same actor system.

So, that being said, do you understand what I'm getting at?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin I understand what you mean. I just thought that you didn't understand me :)

To be more precise, there is one more type of communication:
Spark Master talking to Spark Worker, this needs to be set up independently from any launcher

For this one, you need SSL configured on all the nodes prior to running any application. My understanding of SSL is that it is not intended to authenticate particular clients (applications) but rather authenticating hosts. In this regard, a single SSL configuration per host is enough. At the end, we want to encrypt the data which are exchanged and a single SSL configuration per host does the job.

We can introduce further improvements later, though.

* - SPARK_CONF_DIR/ssl.conf
* - SPARK_HOME/conf/ssl.conf
*/
def defaultConfigFile: Option[File] = {
val specifiedFile = Option(System.getenv("SPARK_SSL_CONFIG_FILE")).map(new File(_))
val sparkConfDir = Option(System.getenv("SPARK_CONF_DIR")).map(new File(_))
val sparkHomeConfDir = Option(System.getenv("SPARK_HOME"))
.map(new File(_, "conf"))
val defaultFile = (sparkConfDir orElse sparkHomeConfDir).map(new File(_, "ssl.conf"))

specifiedFile orElse defaultFile
}

/**
* Loads the given properties file with failover to empty Properties object.
*/
def load(configFile: File): Properties = {
logInfo(s"Loading SSL configuration from $configFile")
try {
val props = new Properties()
val reader = new FileReader(configFile)
try {
props.load(reader)
props.put("sslConfigurationFileLocation", configFile.getAbsolutePath)
props
} finally {
reader.close()
}
} catch {
case ex: Throwable =>
logWarning(s"The SSL configuration file ${configFile.getAbsolutePath} " +
s"could not be loaded. The underlying exception was: ${ex.getMessage}")
new Properties
}
}

/**
* Resolves SSLOptions settings from a given Spark configuration object at a given namespace.
* If SSL settings were loaded from the configuration file, ``sslConfigurationFileLocation``
* property is present in the Spark configuration. The parent directory of that location is used
* as a base directory to resolve relative paths to keystore and truststore.
*/
def parse(conf: SparkConf, ns: String): SSLOptions = {
val parentDir = conf.getOption("sslConfigurationFileLocation").map(new File(_).getParentFile)
.getOrElse(new File(".")).toPath

val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = false)
val keyStore = Try(conf.get(s"$ns.keyStore")).toOption.map(parentDir.resolve(_).toFile)
val keyStorePassword = Try(conf.get(s"$ns.keyStorePassword")).toOption
val keyPassword = Try(conf.get(s"$ns.keyPassword")).toOption
val trustStore = Try(conf.get(s"$ns.trustStore")).toOption.map(parentDir.resolve(_).toFile)
val trustStorePassword = Try(conf.get(s"$ns.trustStorePassword")).toOption
val protocol = Try(conf.get(s"$ns.protocol")).toOption
val enabledAlgorithms = Try(conf.get(s"$ns.enabledAlgorithms")).toOption
.map(_.trim.dropWhile(_ == '[')
.takeWhile(_ != ']')).map(_.split(",").map(_.trim).toSet)
.getOrElse(Set.empty)

new SSLOptions(enabled, keyStore, keyStorePassword, keyPassword, trustStore, trustStorePassword,
protocol, enabledAlgorithms)
}

/**
* Loads the SSL configuration file. If ``spark.ssl.configFile`` property is in the system
* properties, it is assumed it contains the SSL configuration file location to be used.
* Otherwise, it uses the location returned by [[SSLOptions.defaultConfigFile]].
*/
def load(): Properties = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused about all the different ways to load configuration here. Why do you need more than one way? Why isn't everything configured in the already existing Spark configuration file that is processed by spark-submit and friends?

If you need different SSL configurations for different subsystems, you can do that using namespaces (as I think is what you were aiming at by the method at L140), but I don't see the point of having 2 or 3 or more methods just dealing with loading this configuration in different ways.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a one method of loading the configuration. If you mean the config location resolving sequence - I think it is quite obvious - firstly it take the system property, then env variable, then config directory and then home/conf directory.

For the namespaces - yes, my aim was to make it possible to store multiple SSL configurations in a single config file, under different namespaces. In particular, different settings for data, control messages, web UI.

val file = Option(System.getProperty("spark.ssl.configFile"))
.map(new File(_)) orElse defaultConfigFile

file.fold {
logWarning("SSL configuration file not found. SSL will be disabled.")
new Properties()
} { file =>
logInfo(s"Loading SSL configuration from ${file.getAbsolutePath}")
load(file)
}
}

}

41 changes: 41 additions & 0 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
package org.apache.spark

import java.net.{Authenticator, PasswordAuthentication}
import java.security.KeyStore
import java.security.cert.X509Certificate
import javax.net.ssl._

import com.google.common.io.Files
import org.apache.hadoop.io.Text

import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -192,6 +196,43 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
)
}

private[spark] val sslOptions = SSLOptions.parse(sparkConf, "ssl")

private[spark] val (sslSocketFactory, hostnameVerifier) = if (sslOptions.enabled) {
val trustStoreManagers =
for (trustStore <- sslOptions.trustStore) yield {
val ks = KeyStore.getInstance(KeyStore.getDefaultType)
ks.load(Files.asByteSource(sslOptions.trustStore.get).openStream(),
sslOptions.trustStorePassword.get.toCharArray)

val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
tmf.init(ks)
tmf.getTrustManagers
}

lazy val credulousTrustStoreManagers = Array({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why lazy if you're using it in the next few lines anyway? Also, shouldn't this be keyed off some option? Some people won't want to disable cert verification.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it is conditionally evaluated - in the case when trustStoreManagers is undefined

logWarning("Using 'accept-all' trust manager for SSL connections.")
new X509TrustManager {
override def getAcceptedIssuers: Array[X509Certificate] = null

override def checkClientTrusted(x509Certificates: Array[X509Certificate], s: String) {}

override def checkServerTrusted(x509Certificates: Array[X509Certificate], s: String) {}
}: TrustManager
})

val sslContext = SSLContext.getInstance(sslOptions.protocol.getOrElse("Default"))
sslContext.init(null, trustStoreManagers getOrElse credulousTrustStoreManagers, null)

val hostVerifier = new HostnameVerifier {
override def verify(s: String, sslSession: SSLSession): Boolean = true
}

(Some(sslContext.getSocketFactory), Some(hostVerifier))
} else {
(None, None)
}

/**
* Split a comma separated String, filter out any empty items, and return a Set of strings
*/
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
private[spark] val settings = new HashMap[String, String]()

if (loadDefaults) {
// Load SSL settings from SSL configuration file
for ((k, v) <- SSLOptions.load().asScala) {
settings(k) = v
}

// Load any spark.* system properties
for ((k, v) <- System.getProperties.asScala if k.startsWith("spark.")) {
settings(k) = v
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ object SparkEnv extends Logging {
val httpFileServer =
if (isDriver) {
val fileServerPort = conf.getInt("spark.fileserver.port", 0)
val server = new HttpFileServer(securityManager, fileServerPort)
val server = new HttpFileServer(securityManager, fileServerPort, conf)
server.initialize()
conf.set("spark.fileserver.uri", server.serverUri)
server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ private[broadcast] object HttpBroadcast extends Logging {
private def createServer(conf: SparkConf) {
broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
val broadcastPort = conf.getInt("spark.broadcast.port", 0)
server = new HttpServer(broadcastDir, securityManager, broadcastPort, "HTTP broadcast server")
server = new HttpServer(broadcastDir, securityManager,
broadcastPort, "HTTP broadcast server", conf)
server.start()
serverUri = server.uri
logInfo("Broadcast server started at " + serverUri)
Expand Down Expand Up @@ -196,6 +197,7 @@ private[broadcast] object HttpBroadcast extends Logging {
logDebug("broadcast not using security")
uc = new URL(url).openConnection()
}
Utils.setupSecureURLConnection(uc, securityManager)

val in = {
uc.setReadTimeout(httpReadTimeout)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
val timeout = AkkaUtils.askTimeout(conf)

override def preStart() = {
masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master))
masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master, conf))

context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private[spark] class AppClient(
def tryRegisterAllMasters() {
for (masterUrl <- masterUrls) {
logInfo("Connecting to master " + masterUrl + "...")
val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
val actor = context.actorSelection(Master.toAkkaUrl(masterUrl, conf))
actor ! RegisterApplication(appDescription)
}
}
Expand All @@ -104,17 +104,17 @@ private[spark] class AppClient(

def changeMaster(url: String) {
activeMasterUrl = url
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl, conf))
masterAddress = activeMasterUrl match {
case Master.sparkUrlRegex(host, port) =>
Address("akka.tcp", Master.systemName, host, port.toInt)
Address(AkkaUtils.protocol(conf), Master.systemName, host, port.toInt)
case x =>
throw new SparkException("Invalid spark URL: " + x)
}
}

private def isPossibleMaster(remoteUrl: Address) = {
masterUrls.map(s => Master.toAkkaUrl(s))
masterUrls.map(s => Master.toAkkaUrl(s, conf))
.map(u => AddressFromURIString(u).hostPort)
.contains(remoteUrl.hostPort)
}
Expand Down
Loading