Skip to content

[SPARK-4449][Core]Specify port range in spark #5722

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/HttpFileServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.util.Utils
private[spark] class HttpFileServer(
conf: SparkConf,
securityManager: SecurityManager,
requestedPort: Int = 0)
requestedPort: String = "0")
extends Logging {

var baseDir : File = null
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ private[spark] class HttpServer(
conf: SparkConf,
resourceBase: File,
securityManager: SecurityManager,
requestedPort: Int = 0,
requestedPort: String = "0",
serverName: String = "HTTP server")
extends Logging {

private var server: Server = null
private var port: Int = requestedPort
private var port: Int = 0

def start() {
if (server != null) {
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ object SparkEnv extends Logging {
assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!")
assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
val hostname = conf.get("spark.driver.host")
val port = conf.get("spark.driver.port").toInt
val port = conf.get("spark.driver.port")
create(
conf,
SparkContext.DRIVER_IDENTIFIER,
Expand All @@ -205,7 +205,7 @@ object SparkEnv extends Logging {
conf: SparkConf,
executorId: String,
hostname: String,
port: Int,
port: String,
numCores: Int,
isLocal: Boolean): SparkEnv = {
val env = create(
Expand All @@ -228,7 +228,7 @@ object SparkEnv extends Logging {
conf: SparkConf,
executorId: String,
hostname: String,
port: Int,
port: String,
isDriver: Boolean,
isLocal: Boolean,
listenerBus: LiveListenerBus = null,
Expand Down Expand Up @@ -345,7 +345,7 @@ object SparkEnv extends Logging {

val httpFileServer =
if (isDriver) {
val fileServerPort = conf.getInt("spark.fileserver.port", 0)
val fileServerPort = conf.get("spark.fileserver.port", "0")
val server = new HttpFileServer(conf, securityManager, fileServerPort)
server.initialize()
conf.set("spark.fileserver.uri", server.serverUri)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ private[broadcast] object HttpBroadcast extends Logging {

private def createServer(conf: SparkConf) {
broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf), "broadcast")
val broadcastPort = conf.getInt("spark.broadcast.port", 0)
val broadcastPort = conf.get("spark.broadcast.port", "0")
server =
new HttpServer(conf, broadcastDir, securityManager, broadcastPort, "HTTP broadcast server")
server.start()
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ object Client {
Logger.getRootLogger.setLevel(driverArgs.logLevel)

val (actorSystem, _) = AkkaUtils.createActorSystem(
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
"driverClient", Utils.localHostName(), "0", conf, new SecurityManager(conf))

// Verify driverArgs.master is a valid url so that we can use it in ClientActor safely
for (m <- driverArgs.masters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ class LocalSparkCluster(

/* Start the Master */
val (masterSystem, masterPort, webUiPort, _) =
Master.startSystemAndActor(localHostname, 0, 0, _conf)
Master.startSystemAndActor(localHostname, "0", "0", _conf)
masterWebUIPort = webUiPort
masterActorSystems += masterSystem
val masterUrl = "spark://" + Utils.localHostNameForURI() + ":" + masterPort
val masters = Array(masterUrl)

/* Start the Workers */
for (workerNum <- 1 to numWorkers) {
val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker,
val (workerSystem, _) = Worker.startSystemAndActor(localHostname, "0", "0", coresPerWorker,
memoryPerWorker, masters, null, Some(workerNum), _conf)
workerActorSystems += workerSystem
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private[spark] object TestClient {
def main(args: Array[String]) {
val url = args(0)
val conf = new SparkConf
val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localHostName(), 0,
val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localHostName(), "0",
conf = conf, securityManager = new SecurityManager(conf))
val desc = new ApplicationDescription("TestClient", Some(1), 512,
Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), Seq()), "ignored")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class HistoryServer(
conf: SparkConf,
provider: ApplicationHistoryProvider,
securityManager: SecurityManager,
port: Int)
port: String)
extends WebUI(securityManager, port, conf) with Logging with UIRoot {

// How many applications to retain
Expand Down Expand Up @@ -225,7 +225,7 @@ object HistoryServer extends Logging {
.newInstance(conf)
.asInstanceOf[ApplicationHistoryProvider]

val port = conf.getInt("spark.history.ui.port", 18080)
val port = conf.get("spark.history.ui.port", "18080")

val server = new HistoryServer(conf, provider, securityManager, port)
server.bind()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import org.apache.spark.util.{ActorLogReceive, AkkaUtils, RpcUtils, SignalLogger
private[master] class Master(
host: String,
port: Int,
webUiPort: Int,
webUiPort: String,
val securityMgr: SecurityManager,
val conf: SparkConf)
extends Actor with ActorLogReceive with Logging with LeaderElectable {
Expand Down Expand Up @@ -129,7 +129,7 @@ private[master] class Master(
private val restServerEnabled = conf.getBoolean("spark.master.rest.enabled", true)
private val restServer =
if (restServerEnabled) {
val port = conf.getInt("spark.master.rest.port", 6066)
val port = conf.get("spark.master.rest.port", "6066")
Some(new StandaloneRestServer(host, port, conf, self, masterUrl))
} else {
None
Expand Down Expand Up @@ -931,8 +931,8 @@ private[deploy] object Master extends Logging {
*/
def startSystemAndActor(
host: String,
port: Int,
webUiPort: Int,
port: String,
webUiPort: String,
conf: SparkConf): (ActorSystem, Int, Int, Option[Int]) = {
val securityMgr = new SecurityManager(conf)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@ import org.apache.spark.util.{IntParam, Utils}
*/
private[master] class MasterArguments(args: Array[String], conf: SparkConf) {
var host = Utils.localHostName()
var port = 7077
var webUiPort = 8080
var port = "7077"
var webUiPort = "8080"
var propertiesFile: String = null

// Check for settings in environment variables
if (System.getenv("SPARK_MASTER_HOST") != null) {
host = System.getenv("SPARK_MASTER_HOST")
}
if (System.getenv("SPARK_MASTER_PORT") != null) {
port = System.getenv("SPARK_MASTER_PORT").toInt
port = System.getenv("SPARK_MASTER_PORT")
}
if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT")
}

parse(args.toList)
Expand All @@ -46,7 +46,7 @@ private[master] class MasterArguments(args: Array[String], conf: SparkConf) {
propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)

if (conf.contains("spark.master.ui.port")) {
webUiPort = conf.get("spark.master.ui.port").toInt
webUiPort = conf.get("spark.master.ui.port")
}

private def parse(args: List[String]): Unit = args match {
Expand All @@ -60,11 +60,11 @@ private[master] class MasterArguments(args: Array[String], conf: SparkConf) {
host = value
parse(tail)

case ("--port" | "-p") :: IntParam(value) :: tail =>
case ("--port" | "-p") :: value :: tail =>
port = value
parse(tail)

case "--webui-port" :: IntParam(value) :: tail =>
case "--webui-port" :: value :: tail =>
webUiPort = value
parse(tail)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.util.RpcUtils
* Web UI server for the standalone master.
*/
private[master]
class MasterWebUI(val master: Master, requestedPort: Int)
class MasterWebUI(val master: Master, requestedPort: String)
extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging
with UIRoot {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import org.apache.spark.util.{IntParam, Utils}

private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf: SparkConf) {
var host = Utils.localHostName()
var port = 7077
var port = "7077"
var name = "Spark Cluster"
var webUiPort = 8081
var webUiPort = "8081"
var masterUrl: String = _
var zookeeperUrl: Option[String] = None
var propertiesFile: String = _
Expand All @@ -40,11 +40,11 @@ private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf:
host = value
parse(tail)

case ("--port" | "-p") :: IntParam(value) :: tail =>
case ("--port" | "-p") :: value :: tail =>
port = value
parse(tail)

case ("--webui-port" | "-p") :: IntParam(value) :: tail =>
case ("--webui-port" | "-p") :: value :: tail =>
webUiPort = value
parse(tail)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.ui.{SparkUI, WebUI}
*/
private[spark] class MesosClusterUI(
securityManager: SecurityManager,
port: Int,
port: String,
conf: SparkConf,
dispatcherPublicAddress: String,
val scheduler: MesosClusterScheduler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import org.apache.spark.util.Utils
*/
private[spark] abstract class RestSubmissionServer(
val host: String,
val requestedPort: Int,
val requestedPort: String,
val masterConf: SparkConf) extends Logging {
protected val submitRequestServlet: SubmitRequestServlet
protected val killRequestServlet: KillRequestServlet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
*/
private[deploy] class StandaloneRestServer(
host: String,
requestedPort: Int,
requestedPort: String,
masterConf: SparkConf,
masterActor: ActorRef,
masterUrl: String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
*/
private[spark] class MesosRestServer(
host: String,
requestedPort: Int,
requestedPort: String,
masterConf: SparkConf,
scheduler: MesosClusterScheduler)
extends RestSubmissionServer(host, requestedPort, masterConf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object DriverWrapper {
case workerUrl :: userJar :: mainClass :: extraArgs =>
val conf = new SparkConf()
val rpcEnv = RpcEnv.create("Driver",
Utils.localHostName(), 0, conf, new SecurityManager(conf))
Utils.localHostName(), "0", conf, new SecurityManager(conf))
rpcEnv.setupEndpoint("workerWatcher", new WorkerWatcher(rpcEnv, workerUrl))

val currentLoader = Thread.currentThread.getContextClassLoader
Expand Down
19 changes: 10 additions & 9 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}
private[worker] class Worker(
host: String,
port: Int,
webUiPort: Int,
webUiPort: String,
cores: Int,
memory: Int,
masterAkkaUrls: Array[String],
Expand All @@ -59,7 +59,6 @@ private[worker] class Worker(
import context.dispatcher

Utils.checkHost(host, "Expected hostname")
assert (port > 0)

// For worker and executor IDs
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
Expand Down Expand Up @@ -271,8 +270,8 @@ private[worker] class Worker(
INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster)
}
case Some(_) =>
logInfo("Not spawning another attempt to register with the master, since there is an" +
" attempt scheduled already.")
logInfo("Not spawning another attempt to register with the master, " +
"since there is an attempt scheduled already.")
}
}

Expand All @@ -283,7 +282,8 @@ private[worker] class Worker(
changeMaster(masterUrl, masterWebUiUrl)
context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
if (CLEANUP_ENABLED) {
logInfo(s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
logInfo(s"Worker cleanup enabled; " +
s"old application directories will be deleted in: $workDir")
context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis,
CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup)
}
Expand Down Expand Up @@ -413,7 +413,7 @@ private[worker] class Worker(

case KillExecutor(masterUrl, appId, execId) =>
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor " + execId)
logWarning(s"Invalid Master ($masterUrl) attempted to launch executor $execId")
} else {
val fullId = appId + "/" + execId
executors.get(fullId) match {
Expand Down Expand Up @@ -456,7 +456,8 @@ private[worker] class Worker(
case DriverStateChanged(driverId, state, exception) => {
state match {
case DriverState.ERROR =>
logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
logWarning(s"Driver $driverId failed " +
s"with unrecoverable exception: ${exception.get}")
case DriverState.FAILED =>
logWarning(s"Driver $driverId exited with failure")
case DriverState.FINISHED =>
Expand Down Expand Up @@ -537,8 +538,8 @@ private[deploy] object Worker extends Logging {

def startSystemAndActor(
host: String,
port: Int,
webUiPort: Int,
port: String,
webUiPort: String,
cores: Int,
memory: Int,
masterUrls: Array[String],
Expand Down
Loading