Skip to content

Commit 2973024

Browse files
handle json exception when file not finished writing
1 parent eb48fd6 commit 2973024

File tree

2 files changed

+23
-5
lines changed

2 files changed

+23
-5
lines changed

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -765,7 +765,8 @@ private[spark] class Master(
765765
val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
766766
appName + status, HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
767767
try {
768-
replayBus.replay(logInput, eventLogFile)
768+
replayBus.replay(logInput, eventLogFile,
769+
eventLogFile.endsWith(EventLoggingListener.IN_PROGRESS))
769770
} finally {
770771
logInput.close()
771772
}

core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.scheduler
1919

20+
import com.fasterxml.jackson.core.JsonParseException
2021
import java.io.{InputStream, IOException}
2122

2223
import scala.io.Source
@@ -40,15 +41,31 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
4041
*
4142
* @param logData Stream containing event log data.
4243
* @param sourceName Filename (or other source identifier) from whence @logData is being read
44+
* @param sourceTruncated Indicate whether log file might be truncated (some abnormal situation
45+
* encountered, log file not finish writing) or not
4346
*/
44-
def replay(logData: InputStream, sourceName: String): Unit = {
47+
def replay(
48+
logData: InputStream,
49+
sourceName: String,
50+
sourceTruncated: Boolean = false): Unit = {
4551
var currentLine: String = null
4652
var lineNumber: Int = 1
4753
try {
4854
val lines = Source.fromInputStream(logData).getLines()
49-
lines.foreach { line =>
50-
currentLine = line
51-
postToAll(JsonProtocol.sparkEventFromJson(parse(line)))
55+
while (lines.hasNext) {
56+
currentLine = lines.next()
57+
try {
58+
postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine)))
59+
} catch {
60+
case jpe: JsonParseException =>
61+
// We can only ignore exception from last line of the file that might be truncated
62+
if (!sourceTruncated || lines.hasNext) {
63+
throw jpe
64+
} else {
65+
logWarning(s"Get json parse exception from log file $sourceName" +
66+
s" in line $lineNumber, the file might not finished writing.")
67+
}
68+
}
5269
lineNumber += 1
5370
}
5471
} catch {

0 commit comments

Comments
 (0)