Skip to content

Commit 9a48fa1

Browse files
committed
Allow adding tabs to SparkUI dynamically + add example
An example of how this is done is in org.apache.spark.ui.FooTab. Run it through bin/spark-class to see what it looks like (which should more or less match your expectations...).
1 parent ed25dfc commit 9a48fa1

File tree

5 files changed

+139
-25
lines changed

5 files changed

+139
-25
lines changed

core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class MasterWebUI(val master: Master, requestedPort: Int)
3535
val masterActorRef = master.self
3636
val timeout = AkkaUtils.askTimeout(master.conf)
3737

38-
/** Initialize all components of the server. Must be called before bind(). */
38+
/** Initialize all components of the server. */
3939
def start() {
4040
attachPage(new ApplicationPage(this))
4141
attachPage(new IndexPage(this))
@@ -59,25 +59,13 @@ class MasterWebUI(val master: Master, requestedPort: Int)
5959
/** Attach a reconstructed UI to this Master UI. Only valid after bind(). */
6060
def attachUI(ui: SparkUI) {
6161
assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs")
62-
val rootHandler = serverInfo.get.rootHandler
63-
for (handler <- ui.getHandlers) {
64-
rootHandler.addHandler(handler)
65-
if (!handler.isStarted) {
66-
handler.start()
67-
}
68-
}
62+
ui.getHandlers.foreach(attachHandler)
6963
}
7064

7165
/** Detach a reconstructed UI from this Master UI. Only valid after bind(). */
7266
def detachUI(ui: SparkUI) {
7367
assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs")
74-
val rootHandler = serverInfo.get.rootHandler
75-
for (handler <- ui.getHandlers) {
76-
if (handler.isStarted) {
77-
handler.stop()
78-
}
79-
rootHandler.removeHandler(handler)
80-
}
68+
ui.getHandlers.foreach(detachHandler)
8169
}
8270
}
8371

core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
3838
worker.conf.get("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt)
3939
val timeout = AkkaUtils.askTimeout(worker.conf)
4040

41-
/** Initialize all components of the server. Must be called before bind(). */
41+
/** Initialize all components of the server. */
4242
def start() {
4343
val logPage = new LogPage(this)
4444
attachPage(logPage)
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.ui
19+
20+
import javax.servlet.http.HttpServletRequest
21+
22+
import scala.collection.mutable
23+
import scala.xml.Node
24+
25+
import org.apache.spark.{SparkConf, SparkContext}
26+
import org.apache.spark.SparkContext._
27+
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd}
28+
29+
/*
30+
* This is an example of how to extend the SparkUI by adding new tabs to it. It is intended
31+
* only as a demonstration and should be removed before merging into master!
32+
*
33+
* bin/spark-class org.apache.spark.ui.FooTab
34+
*/
35+
36+
/** A tab that displays basic information about jobs seen so far. */
37+
private[spark] class FooTab(parent: SparkUI) extends UITab("foo") {
38+
val appName = parent.appName
39+
val basePath = parent.basePath
40+
41+
def start() {
42+
listener = Some(new FooListener)
43+
attachPage(new IndexPage(this))
44+
}
45+
46+
def fooListener: FooListener = {
47+
assert(listener.isDefined, "ExecutorsTab has not started yet!")
48+
listener.get.asInstanceOf[FooListener]
49+
}
50+
51+
def headerTabs: Seq[UITab] = parent.getTabs
52+
}
53+
54+
/** A foo page. Enough said. */
55+
private[spark] class IndexPage(parent: FooTab) extends UIPage("") {
56+
private val appName = parent.appName
57+
private val basePath = parent.basePath
58+
private val listener = parent.fooListener
59+
60+
override def render(request: HttpServletRequest): Seq[Node] = {
61+
val results = listener.jobResultMap.toSeq.sortBy { case (k, _) => k }
62+
val content =
63+
<div class="row-fluid">
64+
<div class="span12">
65+
<strong>Foo Jobs: </strong>
66+
<ul>
67+
{results.map { case (k, v) => <li>Job {k}: <strong>{v}</strong></li> }}
68+
</ul>
69+
</div>
70+
</div>
71+
UIUtils.headerSparkPage(content, basePath, appName, "Foo", parent.headerTabs, parent)
72+
}
73+
}
74+
75+
/** A listener that maintains a mapping between job IDs and job results. */
76+
private[spark] class FooListener extends SparkListener {
77+
val jobResultMap = mutable.Map[Int, String]()
78+
79+
override def onJobEnd(end: SparkListenerJobEnd) {
80+
jobResultMap(end.jobId) = end.jobResult.toString
81+
}
82+
}
83+
84+
85+
/**
86+
* Start a SparkContext and a SparkUI with a FooTab attached.
87+
*/
88+
private[spark] object FooTab {
89+
def main(args: Array[String]) {
90+
val sc = new SparkContext("local", "Foo Tab", new SparkConf)
91+
val fooTab = new FooTab(sc.ui)
92+
sc.ui.attachTab(fooTab)
93+
94+
// Run a few jobs
95+
sc.parallelize(1 to 1000).count()
96+
sc.parallelize(1 to 2000).persist().count()
97+
sc.parallelize(1 to 3000).map(i => (i/2, i)).groupByKey().count()
98+
sc.parallelize(1 to 4000).map(i => (i/2, i)).groupByKey().persist().count()
99+
sc.parallelize(1 to 5000).map(i => (i/2, i)).groupByKey().persist().count()
100+
sc.parallelize(1 to 6000).map(i => (i/2, i)).groupByKey().persist().count()
101+
sc.parallelize(1 to 7000).map(i => (i/2, i)).groupByKey().persist().count()
102+
103+
readLine("\n> Started SparkUI with a Foo tab...")
104+
}
105+
}

core/src/main/scala/org/apache/spark/ui/SparkUI.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,9 @@ private[spark] class SparkUI(
5252

5353
// Maintain executor storage status through Spark events
5454
val storageStatusListener = new StorageStatusListener
55+
listenerBus.addListener(storageStatusListener)
5556

56-
/** Initialize all components of the server. Must be called before bind(). */
57+
/** Initialize all components of the server. */
5758
def start() {
5859
attachTab(new JobProgressTab(this))
5960
attachTab(new BlockManagerTab(this))
@@ -64,14 +65,10 @@ private[spark] class SparkUI(
6465
if (live) {
6566
sc.env.metricsSystem.getServletHandlers.foreach(attachHandler)
6667
}
67-
// Storage status listener must receive events first, as other listeners depend on its state
68-
listenerBus.addListener(storageStatusListener)
69-
getListeners.foreach(listenerBus.addListener)
7068
}
7169

7270
/** Bind to the HTTP server behind this web interface. */
7371
def bind() {
74-
assert(!handlers.isEmpty, "SparkUI has not started yet!")
7572
try {
7673
serverInfo = Some(startJettyServer(bindHost, port, handlers, sc.conf))
7774
logInfo("Started Spark web UI at http://%s:%d".format(publicHost, boundPort))
@@ -82,6 +79,12 @@ private[spark] class SparkUI(
8279
}
8380
}
8481

82+
/** Attach a tab to this UI, along with its corresponding listener if it exists. */
83+
override def attachTab(tab: UITab) {
84+
super.attachTab(tab)
85+
tab.listener.foreach(listenerBus.addListener)
86+
}
87+
8588
/** Stop the server behind this web interface. Only valid after bind(). */
8689
override def stop() {
8790
super.stop()

core/src/main/scala/org/apache/spark/ui/WebUI.scala

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import org.apache.spark.util.Utils
3535
*
3636
* Each WebUI represents a collection of tabs, each of which in turn represents a collection of
3737
* pages. The use of tabs is optional, however; a WebUI may choose to include pages directly.
38-
* All tabs and pages must be attached before bind()'ing the server.
3938
*/
4039
private[spark] abstract class WebUI(securityManager: SecurityManager, basePath: String = "") {
4140
protected val tabs = ArrayBuffer[UITab]()
@@ -46,14 +45,14 @@ private[spark] abstract class WebUI(securityManager: SecurityManager, basePath:
4645
def getHandlers: Seq[ServletContextHandler] = handlers.toSeq
4746
def getListeners: Seq[SparkListener] = tabs.flatMap(_.listener)
4847

49-
/** Attach a tab to this UI, along with all of its attached pages. Only valid before bind(). */
48+
/** Attach a tab to this UI, along with all of its attached pages. */
5049
def attachTab(tab: UITab) {
5150
tab.start()
5251
tab.pages.foreach(attachPage)
5352
tabs += tab
5453
}
5554

56-
/** Attach a page to this UI. Only valid before bind(). */
55+
/** Attach a page to this UI. */
5756
def attachPage(page: UIPage) {
5857
val pagePath = "/" + page.prefix
5958
attachHandler(createServletHandler(pagePath,
@@ -64,9 +63,26 @@ private[spark] abstract class WebUI(securityManager: SecurityManager, basePath:
6463
}
6564
}
6665

67-
/** Attach a handler to this UI. Only valid before bind(). */
66+
/** Attach a handler to this UI. */
6867
def attachHandler(handler: ServletContextHandler) {
6968
handlers += handler
69+
serverInfo.foreach { info =>
70+
info.rootHandler.addHandler(handler)
71+
if (!handler.isStarted) {
72+
handler.start()
73+
}
74+
}
75+
}
76+
77+
/** Detach a handler from this UI. */
78+
def detachHandler(handler: ServletContextHandler) {
79+
handlers -= handler
80+
serverInfo.foreach { info =>
81+
info.rootHandler.removeHandler(handler)
82+
if (handler.isStarted) {
83+
handler.stop()
84+
}
85+
}
7086
}
7187

7288
/** Initialize all components of the server. Must be called before bind(). */
@@ -89,6 +105,7 @@ private[spark] abstract class WebUI(securityManager: SecurityManager, basePath:
89105
}
90106
}
91107

108+
92109
/**
93110
* A tab that represents a collection of pages and a unit of listening for Spark events.
94111
* Associating each tab with a listener is arbitrary and need not be the case.
@@ -108,6 +125,7 @@ private[spark] abstract class UITab(val prefix: String) {
108125
def start()
109126
}
110127

128+
111129
/**
112130
* A page that represents the leaf node in the UI hierarchy.
113131
*

0 commit comments

Comments
 (0)