Skip to content

Commit d8c9cd0

Browse files
committed
Replaying events only return information when app is started
1 parent 4c5889e commit d8c9cd0

File tree

1 file changed

+24
-14
lines changed

1 file changed

+24
-14
lines changed

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
160160
replayBus.addListener(appListener)
161161
val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus)
162162

163-
ui.setAppName(s"${appInfo.name} ($appId)")
163+
appInfo.map { app => ui.setAppName(s"${app.name} ($appId)") }
164164

165165
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
166166
ui.getSecurityManager.setAcls(uiAclsEnabled)
@@ -282,8 +282,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
282282
val newAttempts = logs.flatMap { fileStatus =>
283283
try {
284284
val res = replay(fileStatus, bus)
285-
logInfo(s"Application log ${res.logPath} loaded successfully.")
286-
Some(res)
285+
res.map { r =>
286+
logInfo(s"Application log ${r.logPath} loaded successfully.")
287+
}.getOrElse {
288+
logInfo(
289+
s"Failed to load application log ${fileStatus.getPath}." +
290+
"The application may have not started.")
291+
}
292+
res
287293
} catch {
288294
case e: Exception =>
289295
logError(
@@ -431,7 +437,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
431437
* Replays the events in the specified log file and returns information about the associated
432438
* application.
433439
*/
434-
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = {
440+
private def replay(
441+
eventLog: FileStatus,
442+
bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
435443
val logPath = eventLog.getPath()
436444
logInfo(s"Replaying log path: $logPath")
437445
val logInput =
@@ -445,16 +453,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
445453
val appCompleted = isApplicationCompleted(eventLog)
446454
bus.addListener(appListener)
447455
bus.replay(logInput, logPath.toString, !appCompleted)
448-
new FsApplicationAttemptInfo(
449-
logPath.getName(),
450-
appListener.appName.getOrElse(NOT_STARTED),
451-
appListener.appId.getOrElse(logPath.getName()),
452-
appListener.appAttemptId,
453-
appListener.startTime.getOrElse(-1L),
454-
appListener.endTime.getOrElse(-1L),
455-
getModificationTime(eventLog).get,
456-
appListener.sparkUser.getOrElse(NOT_STARTED),
457-
appCompleted)
456+
appListener.appId.map { appId =>
457+
new FsApplicationAttemptInfo(
458+
logPath.getName(),
459+
appListener.appName.getOrElse(NOT_STARTED),
460+
appId,
461+
appListener.appAttemptId,
462+
appListener.startTime.getOrElse(-1L),
463+
appListener.endTime.getOrElse(-1L),
464+
getModificationTime(eventLog).get,
465+
appListener.sparkUser.getOrElse(NOT_STARTED),
466+
appCompleted)
467+
}
458468
} finally {
459469
logInput.close()
460470
}

0 commit comments

Comments
 (0)