Skip to content

Commit f36dc3f

Browse files
tdaspwendell
authored andcommitted
[SPARK-1386] Web UI for Spark Streaming
When debugging Spark Streaming applications it is necessary to monitor certain metrics that are not shown in the Spark application UI. For example, what is average processing time of batches? What is the scheduling delay? Is the system able to process as fast as it is receiving data? How many records I am receiving through my receivers? While the StreamingListener interface introduced in the 0.9 provided some of this information, it could only be accessed programmatically. A UI that shows information specific to the streaming applications is necessary for easier debugging. This PR introduces such a UI. It shows various statistics related to the streaming application. Here is a screenshot of the UI running on my local machine. http://i.imgur.com/1ooDGhm.png This UI is integrated into the Spark UI running at 4040. Author: Tathagata Das <tathagata.das1565@gmail.com> Author: Andrew Or <andrewor14@gmail.com> Closes apache#290 from tdas/streaming-web-ui and squashes the following commits: fc73ca5 [Tathagata Das] Merge pull request alteryx#9 from andrewor14/ui-refactor 642dd88 [Andrew Or] Merge SparkUISuite.scala into UISuite.scala eb30517 [Andrew Or] Merge github.com:apache/spark into ui-refactor f4f4cbe [Tathagata Das] More minor fixes. 34bb364 [Tathagata Das] Merge branch 'streaming-web-ui' of github.com:tdas/spark into streaming-web-ui 252c566 [Tathagata Das] Merge pull request alteryx#8 from andrewor14/ui-refactor e038b4b [Tathagata Das] Addressed Patrick's comments. 125a054 [Andrew Or] Disable serving static resources with gzip 90feb8d [Andrew Or] Address Patrick's comments 89dae36 [Tathagata Das] Merge branch 'streaming-web-ui' of github.com:tdas/spark into streaming-web-ui 72fe256 [Tathagata Das] Merge pull request alteryx#6 from andrewor14/ui-refactor 2fc09c8 [Tathagata Das] Added binary check exclusions aa396d4 [Andrew Or] Rename tabs and pages (No more IndexPage.scala) f8e1053 [Tathagata Das] Added Spark and Streaming UI unit tests. caa5e05 [Tathagata Das] Merge branch 'streaming-web-ui' of github.com:tdas/spark into streaming-web-ui 585cd65 [Tathagata Das] Merge pull request alteryx#5 from andrewor14/ui-refactor 914b8ff [Tathagata Das] Moved utils functions to UIUtils. 548c98c [Andrew Or] Wide refactoring of WebUI, UITab, and UIPage (see commit message) 6de06b0 [Tathagata Das] Merge remote-tracking branch 'apache/master' into streaming-web-ui ee6543f [Tathagata Das] Minor changes based on Andrew's comments. fa760fe [Tathagata Das] Fixed long line. 1c0bcef [Tathagata Das] Refactored streaming UI into two files. 1af239b [Tathagata Das] Changed streaming UI to attach itself as a tab with the Spark UI. 827e81a [Tathagata Das] Merge branch 'streaming-web-ui' of github.com:tdas/spark into streaming-web-ui 168fe86 [Tathagata Das] Merge pull request #2 from andrewor14/ui-refactor 3e986f8 [Tathagata Das] Merge remote-tracking branch 'apache/master' into streaming-web-ui c78c92d [Andrew Or] Remove outdated comment 8f7323b [Andrew Or] End of file new lines, indentation, and imports (minor) 0d61ee8 [Andrew Or] Merge branch 'streaming-web-ui' of github.com:tdas/spark into ui-refactor 9a48fa1 [Andrew Or] Allow adding tabs to SparkUI dynamically + add example 61358e3 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-web-ui 53be2c5 [Tathagata Das] Minor style updates. ed25dfc [Andrew Or] Generalize SparkUI header to display tabs dynamically a37ad4f [Andrew Or] Comments, imports and formatting (minor) cd000b0 [Andrew Or] Merge github.com:apache/spark into ui-refactor 7d57444 [Andrew Or] Refactoring the UI interface to add flexibility aef4dd5 [Tathagata Das] Added Apache licenses. db27bad [Tathagata Das] Added last batch processing time to StreamingUI. 4d86e98 [Tathagata Das] Added basic stats to the StreamingUI and refactored the UI to a Page to make it easier to transition to using SparkUI later. 93f1c69 [Tathagata Das] Added network receiver information to the Streaming UI. 56cc7fb [Tathagata Das] First cut implementation of Streaming UI.
1 parent 4dfcb38 commit f36dc3f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+1426
-846
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,6 @@ class SparkContext(config: SparkConf) extends Logging {
213213
// Initialize the Spark UI, registering all associated listeners
214214
private[spark] val ui = new SparkUI(this)
215215
ui.bind()
216-
ui.start()
217216

218217
// Optionally log Spark events
219218
private[spark] val eventLogger: Option[EventLoggingListener] = {

core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala

Lines changed: 0 additions & 50 deletions
This file was deleted.

core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala renamed to core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ import javax.servlet.http.HttpServletRequest
2121

2222
import scala.xml.Node
2323

24-
import org.apache.spark.ui.{UIUtils, WebUI}
24+
import org.apache.spark.ui.{WebUIPage, UIUtils}
2525

26-
private[spark] class IndexPage(parent: HistoryServer) {
26+
private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
2727

2828
def render(request: HttpServletRequest): Seq[Node] = {
2929
val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated }
@@ -62,13 +62,13 @@ private[spark] class IndexPage(parent: HistoryServer) {
6262
private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
6363
val appName = if (info.started) info.name else info.logDirPath.getName
6464
val uiAddress = parent.getAddress + info.ui.basePath
65-
val startTime = if (info.started) WebUI.formatDate(info.startTime) else "Not started"
66-
val endTime = if (info.completed) WebUI.formatDate(info.endTime) else "Not completed"
65+
val startTime = if (info.started) UIUtils.formatDate(info.startTime) else "Not started"
66+
val endTime = if (info.completed) UIUtils.formatDate(info.endTime) else "Not completed"
6767
val difference = if (info.started && info.completed) info.endTime - info.startTime else -1L
68-
val duration = if (difference > 0) WebUI.formatDuration(difference) else "---"
68+
val duration = if (difference > 0) UIUtils.formatDuration(difference) else "---"
6969
val sparkUser = if (info.started) info.sparkUser else "Unknown user"
7070
val logDirectory = info.logDirPath.getName
71-
val lastUpdated = WebUI.formatDate(info.lastUpdated)
71+
val lastUpdated = UIUtils.formatDate(info.lastUpdated)
7272
<tr>
7373
<td><a href={uiAddress}>{appName}</a></td>
7474
<td>{startTime}</td>

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

Lines changed: 26 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,13 @@
1717

1818
package org.apache.spark.deploy.history
1919

20-
import javax.servlet.http.HttpServletRequest
21-
2220
import scala.collection.mutable
2321

2422
import org.apache.hadoop.fs.{FileStatus, Path}
25-
import org.eclipse.jetty.servlet.ServletContextHandler
2623

2724
import org.apache.spark.{Logging, SecurityManager, SparkConf}
28-
import org.apache.spark.deploy.SparkUIContainer
2925
import org.apache.spark.scheduler._
30-
import org.apache.spark.ui.SparkUI
26+
import org.apache.spark.ui.{WebUI, SparkUI}
3127
import org.apache.spark.ui.JettyUtils._
3228
import org.apache.spark.util.Utils
3329

@@ -46,17 +42,15 @@ import org.apache.spark.util.Utils
4642
*/
4743
class HistoryServer(
4844
val baseLogDir: String,
45+
securityManager: SecurityManager,
4946
conf: SparkConf)
50-
extends SparkUIContainer("History Server") with Logging {
47+
extends WebUI(securityManager, HistoryServer.WEB_UI_PORT, conf) with Logging {
5148

5249
import HistoryServer._
5350

5451
private val fileSystem = Utils.getHadoopFileSystem(baseLogDir)
5552
private val localHost = Utils.localHostName()
5653
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost)
57-
private val port = WEB_UI_PORT
58-
private val securityManager = new SecurityManager(conf)
59-
private val indexPage = new IndexPage(this)
6054

6155
// A timestamp of when the disk was last accessed to check for log updates
6256
private var lastLogCheckTime = -1L
@@ -90,37 +84,23 @@ class HistoryServer(
9084
}
9185
}
9286

93-
private val handlers = Seq[ServletContextHandler](
94-
createStaticHandler(STATIC_RESOURCE_DIR, "/static"),
95-
createServletHandler("/",
96-
(request: HttpServletRequest) => indexPage.render(request), securityMgr = securityManager)
97-
)
98-
9987
// A mapping of application ID to its history information, which includes the rendered UI
10088
val appIdToInfo = mutable.HashMap[String, ApplicationHistoryInfo]()
10189

90+
initialize()
91+
10292
/**
103-
* Start the history server.
93+
* Initialize the history server.
10494
*
10595
* This starts a background thread that periodically synchronizes information displayed on
10696
* this UI with the event logs in the provided base directory.
10797
*/
108-
def start() {
98+
def initialize() {
99+
attachPage(new HistoryPage(this))
100+
attachHandler(createStaticHandler(STATIC_RESOURCE_DIR, "/static"))
109101
logCheckingThread.start()
110102
}
111103

112-
/** Bind to the HTTP server behind this web interface. */
113-
override def bind() {
114-
try {
115-
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf))
116-
logInfo("Started HistoryServer at http://%s:%d".format(publicHost, boundPort))
117-
} catch {
118-
case e: Exception =>
119-
logError("Failed to bind HistoryServer", e)
120-
System.exit(1)
121-
}
122-
}
123-
124104
/**
125105
* Check for any updates to event logs in the base directory. This is only effective once
126106
* the server has been bound.
@@ -151,7 +131,7 @@ class HistoryServer(
151131
// Remove any applications that should no longer be retained
152132
appIdToInfo.foreach { case (appId, info) =>
153133
if (!retainedAppIds.contains(appId)) {
154-
detachUI(info.ui)
134+
detachSparkUI(info.ui)
155135
appIdToInfo.remove(appId)
156136
}
157137
}
@@ -186,15 +166,14 @@ class HistoryServer(
186166
val path = logDir.getPath
187167
val appId = path.getName
188168
val replayBus = new ReplayListenerBus(logInfo.logPaths, fileSystem, logInfo.compressionCodec)
189-
val ui = new SparkUI(replayBus, appId, "/history/" + appId)
190169
val appListener = new ApplicationEventListener
191170
replayBus.addListener(appListener)
171+
val ui = new SparkUI(conf, replayBus, appId, "/history/" + appId)
192172

193173
// Do not call ui.bind() to avoid creating a new server for each application
194-
ui.start()
195174
replayBus.replay()
196175
if (appListener.applicationStarted) {
197-
attachUI(ui)
176+
attachSparkUI(ui)
198177
val appName = appListener.appName
199178
val sparkUser = appListener.sparkUser
200179
val startTime = appListener.startTime
@@ -213,6 +192,18 @@ class HistoryServer(
213192
fileSystem.close()
214193
}
215194

195+
/** Attach a reconstructed UI to this server. Only valid after bind(). */
196+
private def attachSparkUI(ui: SparkUI) {
197+
assert(serverInfo.isDefined, "HistoryServer must be bound before attaching SparkUIs")
198+
ui.getHandlers.foreach(attachHandler)
199+
}
200+
201+
/** Detach a reconstructed UI from this server. Only valid after bind(). */
202+
private def detachSparkUI(ui: SparkUI) {
203+
assert(serverInfo.isDefined, "HistoryServer must be bound before detaching SparkUIs")
204+
ui.getHandlers.foreach(detachHandler)
205+
}
206+
216207
/** Return the address of this server. */
217208
def getAddress: String = "http://" + publicHost + ":" + boundPort
218209

@@ -262,9 +253,9 @@ object HistoryServer {
262253

263254
def main(argStrings: Array[String]) {
264255
val args = new HistoryServerArguments(argStrings)
265-
val server = new HistoryServer(args.logDir, conf)
256+
val securityManager = new SecurityManager(conf)
257+
val server = new HistoryServer(args.logDir, securityManager, conf)
266258
server.bind()
267-
server.start()
268259

269260
// Wait until the end of the world... or if the HistoryServer process is manually stopped
270261
while(true) { Thread.sleep(Int.MaxValue) }

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -625,7 +625,7 @@ private[spark] class Master(
625625
if (completedApps.size >= RETAINED_APPLICATIONS) {
626626
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
627627
completedApps.take(toRemove).foreach( a => {
628-
appIdToUI.remove(a.id).foreach { ui => webUi.detachUI(ui) }
628+
appIdToUI.remove(a.id).foreach { ui => webUi.detachSparkUI(ui) }
629629
applicationMetricsSystem.removeSource(a.appSource)
630630
})
631631
completedApps.trimStart(toRemove)
@@ -667,12 +667,12 @@ private[spark] class Master(
667667
if (!eventLogPaths.isEmpty) {
668668
try {
669669
val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec)
670-
val ui = new SparkUI(replayBus, appName + " (completed)", "/history/" + app.id)
671-
ui.start()
670+
val ui = new SparkUI(
671+
new SparkConf, replayBus, appName + " (completed)", "/history/" + app.id)
672672
replayBus.replay()
673673
app.desc.appUiUrl = ui.basePath
674674
appIdToUI(app.id) = ui
675-
webUi.attachUI(ui)
675+
webUi.attachSparkUI(ui)
676676
return true
677677
} catch {
678678
case t: Throwable =>

core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,16 @@ import org.json4s.JValue
2828
import org.apache.spark.deploy.JsonProtocol
2929
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
3030
import org.apache.spark.deploy.master.ExecutorInfo
31-
import org.apache.spark.ui.UIUtils
31+
import org.apache.spark.ui.{WebUIPage, UIUtils}
3232
import org.apache.spark.util.Utils
3333

34-
private[spark] class ApplicationPage(parent: MasterWebUI) {
35-
val master = parent.masterActorRef
36-
val timeout = parent.timeout
34+
private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") {
35+
36+
private val master = parent.masterActorRef
37+
private val timeout = parent.timeout
3738

3839
/** Executor details for a particular application */
39-
def renderJson(request: HttpServletRequest): JValue = {
40+
override def renderJson(request: HttpServletRequest): JValue = {
4041
val appId = request.getParameter("appId")
4142
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
4243
val state = Await.result(stateFuture, timeout)
@@ -96,7 +97,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
9697
UIUtils.basicSparkPage(content, "Application: " + app.desc.name)
9798
}
9899

99-
def executorRow(executor: ExecutorInfo): Seq[Node] = {
100+
private def executorRow(executor: ExecutorInfo): Seq[Node] = {
100101
<tr>
101102
<td>{executor.id}</td>
102103
<td>

core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala renamed to core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,17 @@ import scala.xml.Node
2525
import akka.pattern.ask
2626
import org.json4s.JValue
2727

28-
import org.apache.spark.deploy.{JsonProtocol}
28+
import org.apache.spark.deploy.JsonProtocol
2929
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
3030
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
31-
import org.apache.spark.ui.{WebUI, UIUtils}
31+
import org.apache.spark.ui.{WebUIPage, UIUtils}
3232
import org.apache.spark.util.Utils
3333

34-
private[spark] class IndexPage(parent: MasterWebUI) {
35-
val master = parent.masterActorRef
36-
val timeout = parent.timeout
34+
private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
35+
private val master = parent.masterActorRef
36+
private val timeout = parent.timeout
3737

38-
def renderJson(request: HttpServletRequest): JValue = {
38+
override def renderJson(request: HttpServletRequest): JValue = {
3939
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
4040
val state = Await.result(stateFuture, timeout)
4141
JsonProtocol.writeMasterState(state)
@@ -139,7 +139,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
139139
UIUtils.basicSparkPage(content, "Spark Master at " + state.uri)
140140
}
141141

142-
def workerRow(worker: WorkerInfo): Seq[Node] = {
142+
private def workerRow(worker: WorkerInfo): Seq[Node] = {
143143
<tr>
144144
<td>
145145
<a href={worker.webUiAddress}>{worker.id}</a>
@@ -154,8 +154,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
154154
</tr>
155155
}
156156

157-
158-
def appRow(app: ApplicationInfo): Seq[Node] = {
157+
private def appRow(app: ApplicationInfo): Seq[Node] = {
159158
<tr>
160159
<td>
161160
<a href={"app?appId=" + app.id}>{app.id}</a>
@@ -169,14 +168,14 @@ private[spark] class IndexPage(parent: MasterWebUI) {
169168
<td sorttable_customkey={app.desc.memoryPerSlave.toString}>
170169
{Utils.megabytesToString(app.desc.memoryPerSlave)}
171170
</td>
172-
<td>{WebUI.formatDate(app.submitDate)}</td>
171+
<td>{UIUtils.formatDate(app.submitDate)}</td>
173172
<td>{app.desc.user}</td>
174173
<td>{app.state.toString}</td>
175-
<td>{WebUI.formatDuration(app.duration)}</td>
174+
<td>{UIUtils.formatDuration(app.duration)}</td>
176175
</tr>
177176
}
178177

179-
def driverRow(driver: DriverInfo): Seq[Node] = {
178+
private def driverRow(driver: DriverInfo): Seq[Node] = {
180179
<tr>
181180
<td>{driver.id} </td>
182181
<td>{driver.submitDate}</td>

0 commit comments

Comments
 (0)