Skip to content

Commit 4b398d0

Browse files
committed
expose UI data as json in new endpoints
1 parent bc36356 commit 4b398d0

File tree

64 files changed

+2401
-107
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+2401
-107
lines changed

core/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,11 @@
214214
<artifactId>json4s-jackson_${scala.binary.version}</artifactId>
215215
<version>3.2.10</version>
216216
</dependency>
217+
<dependency>
218+
<groupId>com.fasterxml.jackson.module</groupId>
219+
<artifactId>jackson-module-scala_2.10</artifactId>
220+
<version>2.3.1</version>
221+
</dependency>
217222
<dependency>
218223
<groupId>org.apache.mesos</groupId>
219224
<artifactId>mesos</artifactId>
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package org.apache.spark.status.api;/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
public enum StageStatus {
19+
Active,
20+
Complete,
21+
Failed,
22+
Pending
23+
}

core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.json4s.JsonDSL._
2222
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
2323
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
2424
import org.apache.spark.deploy.worker.ExecutorRunner
25+
import org.json4s._
2526

2627
private[spark] object JsonProtocol {
2728
def writeWorkerInfo(obj: WorkerInfo) = {
@@ -98,4 +99,5 @@ private[spark] object JsonProtocol {
9899
("executors" -> obj.executors.toList.map(writeExecutorRunner)) ~
99100
("finishedexecutors" -> obj.finishedExecutors.toList.map(writeExecutorRunner))
100101
}
102+
101103
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.history
18+
19+
import javax.servlet.http.HttpServletRequest
20+
21+
import org.apache.spark.status.{UIRoot, StatusJsonRoute}
22+
import org.apache.spark.status.api.ApplicationInfo
23+
import org.apache.spark.deploy.master.{ApplicationInfo => InternalApplicationInfo}
24+
25+
class AllApplicationsJsonRoute(val uiRoot: UIRoot) extends StatusJsonRoute[Seq[ApplicationInfo]] {
26+
27+
override def renderJson(request: HttpServletRequest): Seq[ApplicationInfo] = {
28+
//TODO filter on some query params, eg. completed, minStartTime, etc
29+
uiRoot.getApplicationInfoList
30+
}
31+
32+
}
33+
34+
object AllApplicationsJsonRoute {
35+
def appHistoryInfoToPublicAppInfo(app: ApplicationHistoryInfo): ApplicationInfo = {
36+
ApplicationInfo(
37+
id = app.id,
38+
name = app.name,
39+
startTime = app.startTime,
40+
endTime = app.endTime,
41+
sparkUser = app.sparkUser,
42+
completed = app.completed
43+
)
44+
}
45+
46+
def convertApplicationInfo(internal: InternalApplicationInfo, completed: Boolean): ApplicationInfo = {
47+
ApplicationInfo(
48+
id = internal.id,
49+
name = internal.desc.name,
50+
startTime = internal.startTime,
51+
endTime = internal.endTime,
52+
sparkUser = internal.desc.user,
53+
completed = completed
54+
)
55+
}
56+
}

core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ private[spark] abstract class ApplicationHistoryProvider {
3535
*
3636
* @return List of all know applications.
3737
*/
38-
def getListing(): Iterable[ApplicationHistoryInfo]
38+
def getListing(refresh: Boolean): Iterable[ApplicationHistoryInfo]
3939

4040
/**
4141
* Returns the Spark UI for a specific application.

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
122122
}
123123
}
124124

125-
override def getListing() = applications.values
125+
override def getListing(refresh: Boolean) = {
126+
if (refresh) checkForLogs()
127+
applications.values
128+
}
126129

127130
override def getAppUI(appId: String): Option[SparkUI] = {
128131
try {

core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ package org.apache.spark.deploy.history
1919

2020
import javax.servlet.http.HttpServletRequest
2121

22+
import org.json4s.JValue
23+
import org.json4s.JsonDSL._
24+
2225
import scala.xml.Node
2326

2427
import org.apache.spark.ui.{WebUIPage, UIUtils}
@@ -34,7 +37,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
3437
val requestedIncomplete =
3538
Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean
3639

37-
val allApps = parent.getApplicationList().filter(_.completed != requestedIncomplete)
40+
val allApps = parent.getApplicationList(true).filter(_.completed != requestedIncomplete)
3841
val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0
3942
val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allApps.size))
4043

@@ -67,7 +70,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
6770

6871
<h4>
6972
Showing {actualFirst + 1}-{last + 1} of {allApps.size}
70-
{if (requestedIncomplete) "(Incomplete applications)"}
73+
({if (requestedIncomplete) "Incomplete" else "Complete"} applications)
7174
<span style="float: right">
7275
{
7376
if (actualPage > 1) {
@@ -90,7 +93,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
9093
</h4> ++
9194
appTable
9295
} else {
93-
<h4>No completed applications found!</h4> ++
96+
<h4>No {if (requestedIncomplete) "running" else "completed"} applications found!</h4> ++
9497
<p>Did you specify the correct logging directory?
9598
Please verify your setting of <span style="font-style:italic">
9699
spark.history.fs.logDirectory</span> and whether you have the permissions to

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ import java.util.NoSuchElementException
2121
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
2222

2323
import com.google.common.cache._
24+
import org.apache.spark.deploy.master.ui.MasterApplicationJsonRoute
25+
import org.apache.spark.status.api.ApplicationInfo
26+
import org.apache.spark.status.{UIRoot, JsonRequestHandler}
2427
import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
2528

2629
import org.apache.spark.{Logging, SecurityManager, SparkConf}
@@ -45,7 +48,7 @@ class HistoryServer(
4548
provider: ApplicationHistoryProvider,
4649
securityManager: SecurityManager,
4750
port: Int)
48-
extends WebUI(securityManager, port, conf) with Logging {
51+
extends WebUI(securityManager, port, conf) with Logging with UIRoot {
4952

5053
// How many applications to retain
5154
private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50)
@@ -71,6 +74,7 @@ class HistoryServer(
7174
protected override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = {
7275
val parts = Option(req.getPathInfo()).getOrElse("").split("/")
7376
if (parts.length < 2) {
77+
logError("bad path info!")
7478
res.sendError(HttpServletResponse.SC_BAD_REQUEST,
7579
s"Unexpected path info in request (URI = ${req.getRequestURI()}")
7680
return
@@ -98,6 +102,10 @@ class HistoryServer(
98102
}
99103
}
100104

105+
def getSparkUI(appKey: String): Option[SparkUI] = {
106+
Option(appCache.get(appKey))
107+
}
108+
101109
initialize()
102110

103111
/**
@@ -107,7 +115,13 @@ class HistoryServer(
107115
* this UI with the event logs in the provided base directory.
108116
*/
109117
def initialize() {
118+
//earlier handlers take precedence
110119
attachPage(new HistoryPage(this))
120+
121+
val jsonHandler = new JsonRequestHandler(this, securityManager)
122+
attachHandler(jsonHandler.jsonContextHandler)
123+
124+
111125
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
112126

113127
val contextHandler = new ServletContextHandler
@@ -145,7 +159,11 @@ class HistoryServer(
145159
*
146160
* @return List of all known applications.
147161
*/
148-
def getApplicationList() = provider.getListing()
162+
def getApplicationList(refresh: Boolean) = provider.getListing(refresh)
163+
164+
def getApplicationInfoList: Seq[ApplicationInfo] = {
165+
getApplicationList(true).map{AllApplicationsJsonRoute.appHistoryInfoToPublicAppInfo}.toSeq
166+
}
149167

150168
/**
151169
* Returns the provider configuration to show in the listing page.
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.history
18+
19+
import javax.servlet.http.HttpServletRequest
20+
21+
import org.apache.spark.status.{UIRoot, JsonRequestHandler, StatusJsonRoute}
22+
import org.apache.spark.status.api.ApplicationInfo
23+
24+
class OneApplicationJsonRoute(val uiRoot: UIRoot) extends StatusJsonRoute[ApplicationInfo] {
25+
override def renderJson(request: HttpServletRequest): ApplicationInfo = {
26+
val appIdOpt = JsonRequestHandler.extractAppId(request.getPathInfo)
27+
appIdOpt.map{ appId =>
28+
val apps = uiRoot.getApplicationInfoList.find{_.id == appId}
29+
apps.getOrElse(throw new IllegalArgumentException("unknown app: " + appId))
30+
}.getOrElse{
31+
throw new IllegalArgumentException("no application id specified")
32+
}
33+
}
34+
}

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -715,9 +715,9 @@ private[spark] class Master(
715715

716716
/**
717717
* Rebuild a new SparkUI from the given application's event logs.
718-
* Return whether this is successful.
718+
* Return the UI if successful, else None
719719
*/
720-
def rebuildSparkUI(app: ApplicationInfo): Boolean = {
720+
def rebuildSparkUI(app: ApplicationInfo): Option[SparkUI] = {
721721
val appName = app.desc.name
722722
val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
723723
try {
@@ -726,7 +726,7 @@ private[spark] class Master(
726726
.getOrElse {
727727
// Event logging is not enabled for this application
728728
app.desc.appUiUrl = notFoundBasePath
729-
return false
729+
return None
730730
}
731731

732732
val fs = Utils.getHadoopFileSystem(eventLogFile, hadoopConf)
@@ -738,7 +738,7 @@ private[spark] class Master(
738738
logWarning(msg)
739739
msg = URLEncoder.encode(msg, "UTF-8")
740740
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title"
741-
return false
741+
return None
742742
}
743743

744744
val (logInput, sparkVersion) = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
@@ -754,7 +754,7 @@ private[spark] class Master(
754754
webUi.attachSparkUI(ui)
755755
// Application UI is successfully rebuilt, so link the Master UI to it
756756
app.desc.appUiUrl = ui.basePath
757-
true
757+
Some(ui)
758758
} catch {
759759
case fnf: FileNotFoundException =>
760760
// Event logging is enabled for this application, but no event logs are found
@@ -764,7 +764,7 @@ private[spark] class Master(
764764
msg += " Did you specify the correct logging directory?"
765765
msg = URLEncoder.encode(msg, "UTF-8")
766766
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title"
767-
false
767+
None
768768
case e: Exception =>
769769
// Relay exception message to application UI page
770770
val title = s"Application history load error (${app.id})"
@@ -773,7 +773,7 @@ private[spark] class Master(
773773
logError(msg, e)
774774
msg = URLEncoder.encode(msg, "UTF-8")
775775
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title"
776-
false
776+
None
777777
}
778778
}
779779

0 commit comments

Comments
 (0)