Skip to content

[SPARK-6197][CORE] handle json exception when hisotry file not finished writing #4927

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -764,8 +764,9 @@ private[spark] class Master(
val replayBus = new ReplayListenerBus()
val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
appName + status, HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
val maybeTruncated = eventLogFile.endsWith(EventLoggingListener.IN_PROGRESS)
try {
replayBus.replay(logInput, eventLogFile)
replayBus.replay(logInput, eventLogFile, maybeTruncated)
} finally {
logInput.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.scheduler

import com.fasterxml.jackson.core.JsonParseException
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import java.io.{InputStream, IOException}

import scala.io.Source
Expand All @@ -40,15 +41,31 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
*
* @param logData Stream containing event log data.
* @param sourceName Filename (or other source identifier) from whence @logData is being read
* @param maybeTruncated Indicate whether log file might be truncated (some abnormal situations
* encountered, log file might not finished writing) or not
*/
def replay(logData: InputStream, sourceName: String): Unit = {
def replay(
logData: InputStream,
sourceName: String,
maybeTruncated: Boolean = false): Unit = {
var currentLine: String = null
var lineNumber: Int = 1
try {
val lines = Source.fromInputStream(logData).getLines()
lines.foreach { line =>
currentLine = line
postToAll(JsonProtocol.sparkEventFromJson(parse(line)))
while (lines.hasNext) {
currentLine = lines.next()
try {
postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine)))
} catch {
case jpe: JsonParseException =>
// We can only ignore exception from last line of the file that might be truncated
if (!maybeTruncated || lines.hasNext) {
throw jpe
} else {
logWarning(s"Got JsonParseException from log file $sourceName" +
s" at line $lineNumber, the file might not have finished writing cleanly.")
}
}
lineNumber += 1
}
} catch {
Expand Down