Skip to content

[SPARK-1276] Add a HistoryServer to render persisted UI #204

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
c086bd5
Add HistoryServer and scripts ++ Refactor WebUI interface
andrewor14 Mar 20, 2014
8aac163
Add basic application table
andrewor14 Mar 20, 2014
7584418
Report application start/end times to HistoryServer
andrewor14 Mar 21, 2014
60bc6d5
First complete implementation of HistoryServer (only for finished apps)
andrewor14 Mar 22, 2014
5dbfbb4
Merge branch 'master' of github.com:apache/spark
andrewor14 Mar 22, 2014
d5154da
Styling and comments
andrewor14 Mar 22, 2014
a9eae7e
Merge branch 'master' of github.com:apache/spark
andrewor14 Mar 24, 2014
1b2f391
Minor changes
andrewor14 Mar 24, 2014
0670743
Decouple page rendering from loading files from disk
andrewor14 Mar 31, 2014
81b568b
Fix strange error messages...
andrewor14 Mar 31, 2014
050419e
Merge github.com:apache/spark
andrewor14 Mar 31, 2014
e2f4ff9
Merge github.com:apache/spark
andrewor14 Apr 4, 2014
bc46fc8
Merge github.com:apache/spark
andrewor14 Apr 7, 2014
a3598de
Do not close file system with ReplayBus + fix bind address
andrewor14 Apr 7, 2014
248cb3d
Limit number of live applications + add configurability
andrewor14 Apr 7, 2014
19e1fb4
Address Thomas' comments
andrewor14 Apr 9, 2014
6edf052
Merge github.com:apache/spark
andrewor14 Apr 9, 2014
567474a
Merge github.com:apache/spark
andrewor14 Apr 9, 2014
2282300
Add documentation for the HistoryServer
andrewor14 Apr 9, 2014
d02dbaa
Expose Spark version and include it in event logs
andrewor14 Apr 9, 2014
2dfb494
Decouple checking for application completion from replaying
andrewor14 Apr 10, 2014
f7f5bf0
Make history server's web UI port a Spark configuration
andrewor14 Apr 10, 2014
19d5dd0
Merge github.com:apache/spark
andrewor14 Apr 10, 2014
69d1b41
Do not block on posting SparkListenerApplicationEnd
andrewor14 Apr 10, 2014
b158d98
Address Patrick's comments
andrewor14 Apr 10, 2014
7b7234c
Finished -> Completed
andrewor14 Apr 10, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ DEFAULT_MEM=${SPARK_MEM:-512m}

SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true"

# Add java opts and memory settings for master, worker, executors, and repl.
# Add java opts and memory settings for master, worker, history server, executors, and repl.
case "$1" in
# Master and Worker use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
# Master, Worker, and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to have a small section of documentation about start/stopping the history server as well as this setting on configuring the memory.

'org.apache.spark.deploy.master.Master')
OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_MASTER_OPTS"
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
Expand All @@ -58,6 +58,10 @@ case "$1" in
OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_WORKER_OPTS"
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
;;
'org.apache.spark.deploy.history.HistoryServer')
OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_HISTORY_OPTS"
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
;;

# Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
'org.apache.spark.executor.CoarseGrainedExecutorBackend')
Expand Down
7 changes: 5 additions & 2 deletions bin/spark-class2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,17 @@ if "x%OUR_JAVA_MEM%"=="x" set OUR_JAVA_MEM=512m

set SPARK_DAEMON_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% -Dspark.akka.logLifecycleEvents=true

rem Add java opts and memory settings for master, worker, executors, and repl.
rem Master and Worker use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
rem Add java opts and memory settings for master, worker, history server, executors, and repl.
rem Master, Worker and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
if "%1"=="org.apache.spark.deploy.master.Master" (
set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_MASTER_OPTS%
if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
) else if "%1"=="org.apache.spark.deploy.worker.Worker" (
set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_WORKER_OPTS%
if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
) else if "%1"=="org.apache.spark.deploy.history.HistoryServer" (
set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_HISTORY_OPTS%
if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%

rem Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
) else if "%1"=="org.apache.spark.executor.CoarseGrainedExecutorBackend" (
Expand Down
26 changes: 22 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,12 @@ class SparkContext(config: SparkConf) extends Logging {
private[spark] val eventLogger: Option[EventLoggingListener] = {
if (conf.getBoolean("spark.eventLog.enabled", false)) {
val logger = new EventLoggingListener(appName, conf)
logger.start()
listenerBus.addListener(logger)
Some(logger)
} else None
}

// Information needed to replay logged events, if any
private[spark] val eventLoggingInfo: Option[EventLoggingInfo] =
eventLogger.map { logger => Some(logger.info) }.getOrElse(None)

// At this point, all relevant SparkListeners have been registered, so begin releasing events
listenerBus.start()

Expand Down Expand Up @@ -292,6 +289,7 @@ class SparkContext(config: SparkConf) extends Logging {
cleaner.foreach(_.start())

postEnvironmentUpdate()
postApplicationStart()

/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration: Configuration = {
Expand Down Expand Up @@ -777,6 +775,9 @@ class SparkContext(config: SparkConf) extends Logging {
listenerBus.addListener(listener)
}

/** The version of Spark on which this application is running. */
def version = SparkContext.SPARK_VERSION

/**
* Return a map from the slave to the max memory available for caching and the remaining
* memory available for caching.
Expand Down Expand Up @@ -930,6 +931,7 @@ class SparkContext(config: SparkConf) extends Logging {

/** Shut down the SparkContext. */
def stop() {
postApplicationEnd()
ui.stop()
// Do this only if not stopped already - best case effort.
// prevent NPE if stopped more than once.
Expand Down Expand Up @@ -1175,6 +1177,20 @@ class SparkContext(config: SparkConf) extends Logging {
/** Register a new RDD, returning its RDD ID */
private[spark] def newRddId(): Int = nextRddId.getAndIncrement()

/** Post the application start event */
private def postApplicationStart() {
listenerBus.post(SparkListenerApplicationStart(appName, startTime, sparkUser))
}

/**
* Post the application end event to all listeners immediately, rather than adding it
* to the event queue for it to be asynchronously processed eventually. Otherwise, a race
* condition exists in which the listeners may stop before this event has been propagated.
*/
private def postApplicationEnd() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this create a different race condition where there are events in the queue that get dropped because of this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example say the user code does this:

val x = rdd.count()
sc.stop()

Couldn't there be a stage completion event that is in the queue and gets skipped here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is different from the Shutdown event. The StageCompletion event will still be processed in the regular listener bus thread. It is true that the ApplicationEnd event may be processed before the StageCompletion event, however. A proper way of dealing with this is in #366, where the ordering is preserved but all events are guaranteed to be processed to completion.

listenerBus.post(SparkListenerApplicationEnd(System.currentTimeMillis))
}

/** Post the environment update event once the task scheduler is ready */
private def postEnvironmentUpdate() {
if (taskScheduler != null) {
Expand All @@ -1200,6 +1216,8 @@ class SparkContext(config: SparkConf) extends Logging {
*/
object SparkContext extends Logging {

private[spark] val SPARK_VERSION = "1.0.0"

private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"

private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@

package org.apache.spark.deploy

import org.apache.spark.scheduler.EventLoggingInfo

private[spark] class ApplicationDescription(
val name: String,
val maxCores: Option[Int],
val memoryPerSlave: Int,
val command: Command,
val sparkHome: Option[String],
var appUiUrl: String,
val eventLogInfo: Option[EventLoggingInfo] = None)
val eventLogDir: Option[String] = None)
extends Serializable {

val user = System.getProperty("user.name", "<unknown>")
Expand Down
50 changes: 50 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy

import org.apache.spark.ui.{SparkUI, WebUI}

private[spark] abstract class SparkUIContainer(name: String) extends WebUI(name) {

/** Attach a SparkUI to this container. Only valid after bind(). */
def attachUI(ui: SparkUI) {
assert(serverInfo.isDefined,
"%s must be bound to a server before attaching SparkUIs".format(name))
val rootHandler = serverInfo.get.rootHandler
for (handler <- ui.handlers) {
rootHandler.addHandler(handler)
if (!handler.isStarted) {
handler.start()
}
}
}

/** Detach a SparkUI from this container. Only valid after bind(). */
def detachUI(ui: SparkUI) {
assert(serverInfo.isDefined,
"%s must be bound to a server before detaching SparkUIs".format(name))
val rootHandler = serverInfo.get.rootHandler
for (handler <- ui.handlers) {
if (handler.isStarted) {
handler.stop()
}
rootHandler.removeHandler(handler)
}
}

}
Loading