Skip to content

[SPARK-1768] Make event logger use a single file. #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ private[spark] class ApplicationDescription(
val command: Command,
val sparkHome: Option[String],
var appUiUrl: String,
val eventLogDir: Option[String] = None)
val eventLogFile: Option[String] = None)
extends Serializable {

val user = System.getProperty("user.name", "<unknown>")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.history

import java.io.FileNotFoundException
import java.util.concurrent.atomic.AtomicReference

import scala.collection.mutable

import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Utils

private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
with Logging {

// Interval between each check for event log updates
private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval", 10) * 1000

private val logDir = conf.get("spark.history.fs.logDirectory")
private val fs = Utils.getHadoopFileSystem(logDir)

// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheckTimeMs = -1L

// List of applications, in order from newest to oldest.
private val appList = new AtomicReference[Seq[ApplicationHistoryInfo]](Nil)

// Constants used to parse Spark 1.0.0 log directories.
private[history] val LOG_PREFIX = "EVENT_LOG_"
private[history] val SPARK_VERSION_PREFIX = "SPARK_VERSION_"
private[history] val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"

/**
* A background thread that periodically checks for event log updates on disk.
*
* If a log check is invoked manually in the middle of a period, this thread re-adjusts the
* time at which it performs the next log check to maintain the same period as before.
*
* TODO: Add a mechanism to update manually.
*/
private val logCheckingThread = new Thread("LogCheckingThread") {
override def run() = Utils.logUncaughtExceptions {
while (true) {
val now = getMonotonicTime()
if (now - lastLogCheckTimeMs > UPDATE_INTERVAL_MS) {
Thread.sleep(UPDATE_INTERVAL_MS)
} else {
// If the user has manually checked for logs recently, wait until
// UPDATE_INTERVAL_MS after the last check time
Thread.sleep(lastLogCheckTimeMs + UPDATE_INTERVAL_MS - now)
}
checkForLogs()
}
}
}

initialize()

private def initialize() {
// Validate the log directory.
val path = new Path(logDir)
if (!fs.exists(path)) {
throw new IllegalArgumentException(
"Logging directory specified does not exist: %s".format(logDir))
}
if (!fs.getFileStatus(path).isDir) {
throw new IllegalArgumentException(
"Logging directory specified is not a directory: %s".format(logDir))
}

checkForLogs()

// Treat 0 as "disable the background thread", mostly for testing.
if (UPDATE_INTERVAL_MS > 0) {
logCheckingThread.setDaemon(true)
logCheckingThread.start()
}
}

override def getListing(offset: Int, count: Int) = {
val list = appList.get()
val theOffset = if (offset < list.size) offset else 0
(list.slice(theOffset, Math.min(theOffset + count, list.size)), theOffset, list.size)
}

override def getAppInfo(appId: String): ApplicationHistoryInfo = {
try {
val appLogDir = fs.getFileStatus(new Path(logDir, appId))
loadAppInfo(appLogDir, true)
} catch {
case e: FileNotFoundException => null
}
}

/**
* Builds the application list based on the current contents of the log directory.
* Tries to reuse as much of the data already in memory as possible, but not reading
* applications that hasn't been updated since last time the logs were checked.
*/
private[history] def checkForLogs() = synchronized {
lastLogCheckTimeMs = getMonotonicTime()
logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs))
try {
val matcher = EventLoggingListener.LOG_FILE_NAME_REGEX
val logInfos = fs.listStatus(new Path(logDir))
.filter { entry =>
if (entry.isDir()) {
fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE))
} else {
try {
val matcher(version, codecName, inprogress) = entry.getPath().getName()
inprogress == null
} catch {
case e: Exception => false
}
}
}

var currentApps = Map[String, ApplicationHistoryInfo](
appList.get().map(app => (app.id -> app)):_*)

// For any application that either (i) is not listed or (ii) has changed since the last time
// the listing was created (defined by the log dir's modification time), load the app's info.
// Otherwise just reuse what's already in memory.
val newApps = new mutable.ListBuffer[ApplicationHistoryInfo]
for (log <- logInfos) {
val curr = currentApps.getOrElse(log.getPath().getName(), null)
if (curr == null || curr.lastUpdated < log.getModificationTime()) {
try {
val info = loadAppInfo(log, false)
if (info != null) {
newApps += info
}
} catch {
case e: Exception => logError(s"Failed to load app info from directory $log.")
}
} else {
newApps += curr
}
}

appList.set(newApps.sortBy { info => -info.lastUpdated })
} catch {
case t: Throwable => logError("Exception in checking for event log updates", t)
}
}

/**
* Parse the application's logs to find out the information we need to build the
* listing page.
*/
private def loadAppInfo(log: FileStatus, renderUI: Boolean): ApplicationHistoryInfo = {
val elogInfo = if (log.isFile()) {
EventLoggingListener.parseLoggingInfo(log.getPath())
} else {
loadOldLoggingInfo(log.getPath())
}

if (elogInfo == null) {
return null
}


val (logFile, lastUpdated) = if (log.isFile()) {
(elogInfo.path, log.getModificationTime())
} else {
// For old-style log directories, need to find the actual log file.
val status = fs.listStatus(elogInfo.path)
.filter(e => e.getPath().getName().startsWith(LOG_PREFIX))(0)
(status.getPath(), status.getModificationTime())
}

val appId = elogInfo.path.getName

val replayBus = new ReplayListenerBus(logFile, fs, elogInfo.compressionCodec)
val appListener = new ApplicationEventListener
replayBus.addListener(appListener)

val ui: SparkUI = if (renderUI) {
val conf = this.conf.clone()
val appSecManager = new SecurityManager(conf)
new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId)
// Do not call ui.bind() to avoid creating a new server for each application
} else {
null
}

replayBus.replay()
val appName = appListener.appName
val sparkUser = appListener.sparkUser
val startTime = appListener.startTime
val endTime = appListener.endTime
ApplicationHistoryInfo(appId,
appListener.appName,
appListener.startTime,
appListener.endTime,
lastUpdated,
appListener.sparkUser,
if (renderUI) appListener.viewAcls else null,
ui)
}

/**
* Load the app log information from a Spark 1.0.0 log directory, for backwards compatibility.
* This assumes that the log directory contains a single event log file, which is the case for
* directories generated by the code in that release.
*/
private[history] def loadOldLoggingInfo(dir: Path): EventLoggingInfo = {
val children = fs.listStatus(dir)
var eventLogPath: Path = null
var sparkVersion: String = null
var codecName: String = null
var applicationCompleted: Boolean = false

children.foreach(child => child.getPath().getName() match {
case name if name.startsWith(LOG_PREFIX) =>
eventLogPath = child.getPath()

case ver if ver.startsWith(SPARK_VERSION_PREFIX) =>
sparkVersion = ver.substring(SPARK_VERSION_PREFIX.length())

case codec if codec.startsWith(COMPRESSION_CODEC_PREFIX) =>
codecName = codec.substring(COMPRESSION_CODEC_PREFIX.length())

case complete if complete == APPLICATION_COMPLETE =>
applicationCompleted = true

case _ =>
})

val codec = try {
if (codecName != null) {
Some(CompressionCodec.createCodec(conf, codecName))
} else None
} catch {
case e: Exception =>
logError(s"Unknown compression codec $codecName.")
return null
}

if (eventLogPath == null || sparkVersion == null) {
logInfo(s"$dir is not a Spark application log directory.")
return null
}

EventLoggingInfo(dir, sparkVersion, codec, applicationCompleted)
}

/** Returns the system's mononotically increasing time. */
private def getMonotonicTime() = System.nanoTime() / (1000 * 1000)

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,28 @@ import org.apache.spark.ui.{WebUIPage, UIUtils}

private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {

val pageSize = 20

def render(request: HttpServletRequest): Seq[Node] = {
val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated }
val appTable = UIUtils.listingTable(appHeader, appRow, appRows)
val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt
val requestedFirst = (requestedPage - 1) * pageSize
val (apps, actualFirst, totalCount) = parent.getApplicationList(requestedFirst, pageSize)
val actualPage = (actualFirst / pageSize) + 1
val last = Math.min(actualFirst + pageSize, totalCount) - 1
val pageCount = totalCount / pageSize + (if (totalCount % pageSize > 0) 1 else 0)

val appTable = UIUtils.listingTable(appHeader, appRow, apps)
val content =
<div class="row-fluid">
<div class="span12">
<ul class="unstyled">
<li><strong>Event Log Location: </strong> {parent.baseLogDir}</li>
</ul>
{
if (parent.appIdToInfo.size > 0) {
if (totalCount > 0) {
<h4>
Showing {parent.appIdToInfo.size}/{parent.getNumApplications}
Completed Application{if (parent.getNumApplications > 1) "s" else ""}
Showing {actualFirst + 1}-{last + 1} of {totalCount}
<span style="float: right">
{if (actualPage > 1) <a href={"/?page=" + (actualPage - 1)}>&lt;</a>}
{if (actualPage < pageCount) <a href={"/?page=" + (actualPage + 1)}>&gt;</a>}
</span>
</h4> ++
appTable
} else {
Expand All @@ -56,26 +64,23 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
"Completed",
"Duration",
"Spark User",
"Log Directory",
"Last Updated")

private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
val appName = if (info.started) info.name else info.logDirPath.getName
val uiAddress = parent.getAddress + info.ui.basePath
val appName = if (info.started) info.name else info.id
val uiAddress = "/history/" + info.id
val startTime = if (info.started) UIUtils.formatDate(info.startTime) else "Not started"
val endTime = if (info.completed) UIUtils.formatDate(info.endTime) else "Not completed"
val difference = if (info.started && info.completed) info.endTime - info.startTime else -1L
val duration = if (difference > 0) UIUtils.formatDuration(difference) else "---"
val sparkUser = if (info.started) info.sparkUser else "Unknown user"
val logDirectory = info.logDirPath.getName
val lastUpdated = UIUtils.formatDate(info.lastUpdated)
<tr>
<td><a href={uiAddress}>{appName}</a></td>
<td>{startTime}</td>
<td>{endTime}</td>
<td>{duration}</td>
<td>{sparkUser}</td>
<td>{logDirectory}</td>
<td>{lastUpdated}</td>
</tr>
}
Expand Down
Loading