Skip to content

Commit caa5e05

Browse files
committed
Merge branch 'streaming-web-ui' of github.com:tdas/spark into streaming-web-ui
Conflicts: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
2 parents 914b8ff + 585cd65 commit caa5e05

32 files changed

+183
-248
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,6 @@ class SparkContext(config: SparkConf) extends Logging {
212212

213213
// Initialize the Spark UI, registering all associated listeners
214214
private[spark] val ui = new SparkUI(this)
215-
ui.start()
216215
ui.bind()
217216

218217
// Optionally log Spark events

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,9 @@
1717

1818
package org.apache.spark.deploy.history
1919

20-
import javax.servlet.http.HttpServletRequest
21-
2220
import scala.collection.mutable
2321

2422
import org.apache.hadoop.fs.{FileStatus, Path}
25-
import org.eclipse.jetty.servlet.ServletContextHandler
2623

2724
import org.apache.spark.{Logging, SecurityManager, SparkConf}
2825
import org.apache.spark.scheduler._
@@ -45,15 +42,15 @@ import org.apache.spark.util.Utils
4542
*/
4643
class HistoryServer(
4744
val baseLogDir: String,
45+
securityManager: SecurityManager,
4846
conf: SparkConf)
49-
extends WebUI(new SecurityManager(conf)) with Logging {
47+
extends WebUI(securityManager, HistoryServer.WEB_UI_PORT, conf) with Logging {
5048

5149
import HistoryServer._
5250

5351
private val fileSystem = Utils.getHadoopFileSystem(baseLogDir)
5452
private val localHost = Utils.localHostName()
5553
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost)
56-
private val port = WEB_UI_PORT
5754

5855
// A timestamp of when the disk was last accessed to check for log updates
5956
private var lastLogCheckTime = -1L
@@ -90,30 +87,20 @@ class HistoryServer(
9087
// A mapping of application ID to its history information, which includes the rendered UI
9188
val appIdToInfo = mutable.HashMap[String, ApplicationHistoryInfo]()
9289

90+
initialize()
91+
9392
/**
94-
* Start the history server.
93+
* Initialize the history server.
9594
*
9695
* This starts a background thread that periodically synchronizes information displayed on
9796
* this UI with the event logs in the provided base directory.
9897
*/
99-
def start() {
98+
def initialize() {
10099
attachPage(new IndexPage(this))
101100
attachHandler(createStaticHandler(STATIC_RESOURCE_DIR, "/static"))
102101
logCheckingThread.start()
103102
}
104103

105-
/** Bind to the HTTP server behind this web interface. */
106-
def bind() {
107-
try {
108-
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf))
109-
logInfo("Started HistoryServer at http://%s:%d".format(publicHost, boundPort))
110-
} catch {
111-
case e: Exception =>
112-
logError("Failed to bind HistoryServer", e)
113-
System.exit(1)
114-
}
115-
}
116-
117104
/**
118105
* Check for any updates to event logs in the base directory. This is only effective once
119106
* the server has been bound.
@@ -179,12 +166,11 @@ class HistoryServer(
179166
val path = logDir.getPath
180167
val appId = path.getName
181168
val replayBus = new ReplayListenerBus(logInfo.logPaths, fileSystem, logInfo.compressionCodec)
182-
val ui = new SparkUI(conf, replayBus, appId, "/history/" + appId)
183169
val appListener = new ApplicationEventListener
184170
replayBus.addListener(appListener)
171+
val ui = new SparkUI(conf, replayBus, appId, "/history/" + appId)
185172

186173
// Do not call ui.bind() to avoid creating a new server for each application
187-
ui.start()
188174
replayBus.replay()
189175
if (appListener.applicationStarted) {
190176
attachUI(ui)
@@ -267,9 +253,9 @@ object HistoryServer {
267253

268254
def main(argStrings: Array[String]) {
269255
val args = new HistoryServerArguments(argStrings)
270-
val server = new HistoryServer(args.logDir, conf)
256+
val securityManager = new SecurityManager(conf)
257+
val server = new HistoryServer(args.logDir, securityManager, conf)
271258
server.bind()
272-
server.start()
273259

274260
// Wait until the end of the world... or if the HistoryServer process is manually stopped
275261
while(true) { Thread.sleep(Int.MaxValue) }

core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ import javax.servlet.http.HttpServletRequest
2121

2222
import scala.xml.Node
2323

24-
import org.apache.spark.ui.{UIPage, UIUtils}
24+
import org.apache.spark.ui.{WebUIPage, UIUtils}
2525

26-
private[spark] class IndexPage(parent: HistoryServer) extends UIPage("") {
26+
private[spark] class IndexPage(parent: HistoryServer) extends WebUIPage("") {
2727

2828
override def render(request: HttpServletRequest): Seq[Node] = {
2929
val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated }

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@ private[spark] class Master(
118118
logInfo("Starting Spark master at " + masterUrl)
119119
// Listen for remote client disconnection events, since they don't go through Akka's watch()
120120
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
121-
webUi.start()
122121
webUi.bind()
123122
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
124123
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
@@ -670,7 +669,6 @@ private[spark] class Master(
670669
val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec)
671670
val ui = new SparkUI(
672671
new SparkConf, replayBus, appName + " (completed)", "/history/" + app.id)
673-
ui.start()
674672
replayBus.replay()
675673
app.desc.appUiUrl = ui.basePath
676674
appIdToUI(app.id) = ui

core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@ import org.json4s.JValue
2828
import org.apache.spark.deploy.JsonProtocol
2929
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
3030
import org.apache.spark.deploy.master.ExecutorInfo
31-
import org.apache.spark.ui.{UIPage, UIUtils}
31+
import org.apache.spark.ui.{WebUIPage, UIUtils}
3232
import org.apache.spark.util.Utils
3333

3434
private[spark] class ApplicationPage(parent: MasterWebUI)
35-
extends UIPage("app", includeJson = true) {
35+
extends WebUIPage("app", includeJson = true) {
3636

3737
private val master = parent.masterActorRef
3838
private val timeout = parent.timeout

core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ import org.json4s.JValue
2828
import org.apache.spark.deploy.JsonProtocol
2929
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
3030
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
31-
import org.apache.spark.ui.{UIPage, UIUtils}
31+
import org.apache.spark.ui.{WebUIPage, UIUtils}
3232
import org.apache.spark.util.Utils
3333

34-
private[spark] class IndexPage(parent: MasterWebUI) extends UIPage("", includeJson = true) {
34+
private[spark] class IndexPage(parent: MasterWebUI) extends WebUIPage("", includeJson = true) {
3535
private val master = parent.masterActorRef
3636
private val timeout = parent.timeout
3737

core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.deploy.master.ui
1919

20-
import javax.servlet.http.HttpServletRequest
21-
2220
import org.apache.spark.Logging
2321
import org.apache.spark.deploy.master.Master
2422
import org.apache.spark.ui.{SparkUI, WebUI}
@@ -30,34 +28,22 @@ import org.apache.spark.util.{AkkaUtils, Utils}
3028
*/
3129
private[spark]
3230
class MasterWebUI(val master: Master, requestedPort: Int)
33-
extends WebUI(master.securityMgr) with Logging {
31+
extends WebUI(master.securityMgr, requestedPort, master.conf) with Logging {
3432

35-
private val host = Utils.localHostName()
36-
private val port = requestedPort
3733
val masterActorRef = master.self
3834
val timeout = AkkaUtils.askTimeout(master.conf)
3935

36+
initialize()
37+
4038
/** Initialize all components of the server. */
41-
def start() {
39+
def initialize() {
4240
attachPage(new ApplicationPage(this))
4341
attachPage(new IndexPage(this))
4442
attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"))
4543
master.masterMetricsSystem.getServletHandlers.foreach(attachHandler)
4644
master.applicationMetricsSystem.getServletHandlers.foreach(attachHandler)
4745
}
4846

49-
/** Bind to the HTTP server behind this web interface. */
50-
def bind() {
51-
try {
52-
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, master.conf))
53-
logInfo("Started Master web UI at http://%s:%d".format(host, boundPort))
54-
} catch {
55-
case e: Exception =>
56-
logError("Failed to create Master web UI", e)
57-
System.exit(1)
58-
}
59-
}
60-
6147
/** Attach a reconstructed UI to this Master UI. Only valid after bind(). */
6248
def attachUI(ui: SparkUI) {
6349
assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs")

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,6 @@ private[spark] class Worker(
130130
createWorkDir()
131131
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
132132
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
133-
webUi.start()
134133
webUi.bind()
135134
registerWithMaster()
136135

core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ import org.apache.spark.deploy.JsonProtocol
2828
import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse}
2929
import org.apache.spark.deploy.master.DriverState
3030
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
31-
import org.apache.spark.ui.{UIPage, UIUtils}
31+
import org.apache.spark.ui.{WebUIPage, UIUtils}
3232
import org.apache.spark.util.Utils
3333

34-
private[spark] class IndexPage(parent: WorkerWebUI) extends UIPage("", includeJson = true) {
34+
private[spark] class IndexPage(parent: WorkerWebUI) extends WebUIPage("", includeJson = true) {
3535
val workerActor = parent.worker.self
3636
val worker = parent.worker
3737
val timeout = parent.timeout

core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ import javax.servlet.http.HttpServletRequest
2222

2323
import scala.xml.Node
2424

25-
import org.apache.spark.ui.{UIPage, UIUtils}
25+
import org.apache.spark.ui.{WebUIPage, UIUtils}
2626
import org.apache.spark.util.Utils
2727

28-
private[spark] class LogPage(parent: WorkerWebUI) extends UIPage("logPage") {
28+
private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
2929
private val worker = parent.worker
3030
private val workDir = parent.workDir
3131

core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,26 +20,29 @@ package org.apache.spark.deploy.worker.ui
2020
import java.io.File
2121
import javax.servlet.http.HttpServletRequest
2222

23-
import org.apache.spark.Logging
23+
import org.apache.spark.{Logging, SparkConf}
2424
import org.apache.spark.deploy.worker.Worker
2525
import org.apache.spark.ui.{SparkUI, WebUI}
2626
import org.apache.spark.ui.JettyUtils._
27-
import org.apache.spark.util.{AkkaUtils, Utils}
27+
import org.apache.spark.util.AkkaUtils
2828

2929
/**
3030
* Web UI server for the standalone worker.
3131
*/
3232
private[spark]
33-
class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None)
34-
extends WebUI(worker.securityMgr) with Logging {
33+
class WorkerWebUI(
34+
val worker: Worker,
35+
val workDir: File,
36+
port: Option[Int] = None)
37+
extends WebUI(worker.securityMgr, WorkerWebUI.getUIPort(port, worker.conf), worker.conf)
38+
with Logging {
3539

36-
private val host = Utils.localHostName()
37-
private val port = requestedPort.getOrElse(
38-
worker.conf.getInt("worker.ui.port", WorkerWebUI.DEFAULT_PORT))
3940
val timeout = AkkaUtils.askTimeout(worker.conf)
4041

42+
initialize()
43+
4144
/** Initialize all components of the server. */
42-
def start() {
45+
def initialize() {
4346
val logPage = new LogPage(this)
4447
attachPage(logPage)
4548
attachPage(new IndexPage(this))
@@ -48,21 +51,13 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
4851
(request: HttpServletRequest) => logPage.renderLog(request), worker.securityMgr))
4952
worker.metricsSystem.getServletHandlers.foreach(attachHandler)
5053
}
51-
52-
/** Bind to the HTTP server behind this web interface. */
53-
def bind() {
54-
try {
55-
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, worker.conf))
56-
logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort))
57-
} catch {
58-
case e: Exception =>
59-
logError("Failed to create Worker web UI", e)
60-
System.exit(1)
61-
}
62-
}
6354
}
6455

6556
private[spark] object WorkerWebUI {
6657
val DEFAULT_PORT = 8081
6758
val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR
59+
60+
def getUIPort(requestedPort: Option[Int], conf: SparkConf): Int = {
61+
requestedPort.getOrElse(conf.getInt("worker.ui.port", WorkerWebUI.DEFAULT_PORT))
62+
}
6863
}

core/src/main/scala/org/apache/spark/ui/SparkUI.scala

Lines changed: 17 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,19 @@ import org.apache.spark.ui.env.EnvironmentTab
2525
import org.apache.spark.ui.exec.ExecutorsTab
2626
import org.apache.spark.ui.jobs.JobProgressTab
2727
import org.apache.spark.ui.storage.BlockManagerTab
28-
import org.apache.spark.util.Utils
2928

3029
/**
3130
* Top level user interface for Spark.
3231
*/
3332
private[spark] class SparkUI(
3433
val sc: SparkContext,
35-
conf: SparkConf,
34+
val conf: SparkConf,
3635
val securityManager: SecurityManager,
3736
val listenerBus: SparkListenerBus,
3837
var appName: String,
3938
val basePath: String = "")
40-
extends WebUI(securityManager, basePath) with Logging {
39+
extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath)
40+
with Logging {
4141

4242
def this(sc: SparkContext) = this(sc, sc.conf, sc.env.securityManager, sc.listenerBus, sc.appName)
4343
def this(conf: SparkConf, listenerBus: SparkListenerBus, appName: String, basePath: String) =
@@ -46,21 +46,14 @@ private[spark] class SparkUI(
4646
// If SparkContext is not provided, assume the associated application is not live
4747
val live = sc != null
4848

49-
private val bindHost = Utils.localHostName()
50-
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost)
51-
private val port = conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT)
52-
5349
// Maintain executor storage status through Spark events
5450
val storageStatusListener = new StorageStatusListener
55-
listenerBus.addListener(storageStatusListener)
5651

57-
/** Set the app name for this UI. */
58-
def setAppName(name: String) {
59-
appName = name
60-
}
52+
initialize()
6153

6254
/** Initialize all components of the server. */
63-
def start() {
55+
def initialize() {
56+
listenerBus.addListener(storageStatusListener)
6457
attachTab(new JobProgressTab(this))
6558
attachTab(new BlockManagerTab(this))
6659
attachTab(new EnvironmentTab(this))
@@ -72,22 +65,14 @@ private[spark] class SparkUI(
7265
}
7366
}
7467

75-
/** Bind to the HTTP server behind this web interface. */
76-
def bind() {
77-
try {
78-
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, sc.conf))
79-
logInfo("Started Spark Web UI at http://%s:%d".format(publicHost, boundPort))
80-
} catch {
81-
case e: Exception =>
82-
logError("Failed to create Spark web UI", e)
83-
System.exit(1)
84-
}
68+
/** Set the app name for this UI. */
69+
def setAppName(name: String) {
70+
appName = name
8571
}
8672

87-
/** Attach a tab to this UI, along with its corresponding listener if it exists. */
88-
override def attachTab(tab: UITab) {
89-
super.attachTab(tab)
90-
tab.listener.foreach(listenerBus.addListener)
73+
/** Register the given listener with the listener bus. */
74+
def registerListener(listener: SparkListener) {
75+
listenerBus.addListener(listener)
9176
}
9277

9378
/** Stop the server behind this web interface. Only valid after bind(). */
@@ -96,10 +81,14 @@ private[spark] class SparkUI(
9681
logInfo("Stopped Spark web UI at %s".format(appUIAddress))
9782
}
9883

99-
private[spark] def appUIAddress = "http://" + publicHost + ":" + boundPort
84+
private[spark] def appUIAddress = "http://" + publicHostName + ":" + boundPort
10085
}
10186

10287
private[spark] object SparkUI {
10388
val DEFAULT_PORT = 4040
10489
val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"
90+
91+
def getUIPort(conf: SparkConf): Int = {
92+
conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT)
93+
}
10594
}

0 commit comments

Comments
 (0)