Skip to content

Commit 100fc58

Browse files
HeartSaVioRMarcelo Vanzin
authored and
Marcelo Vanzin
committed
[SPARK-28869][CORE] Roll over event log files
### What changes were proposed in this pull request? This patch is a part of [SPARK-28594](https://issues.apache.org/jira/browse/SPARK-28594) and design doc for SPARK-28594 is linked here: https://docs.google.com/document/d/12bdCC4nA58uveRxpeo8k7kGOI2NRTXmXyBOweSi4YcY/edit?usp=sharing This patch proposes adding new feature to event logging, rolling event log files via configured file size. Previously event logging is done with single file and related codebase (`EventLoggingListener`/`FsHistoryProvider`) is tightly coupled with it. This patch adds layer on both reader (`EventLogFileReader`) and writer (`EventLogFileWriter`) to decouple implementation details between "handling events" and "how to read/write events from/to file". This patch adds two properties, `spark.eventLog.rollLog` and `spark.eventLog.rollLog.maxFileSize` which provides configurable behavior of rolling log. The feature is disabled by default, as we only expect huge event log for huge/long-running application. For other cases single event log file would be sufficient and still simpler. ### Why are the changes needed? This is a part of SPARK-28594 which addresses event log growing infinitely for long-running application. This patch itself also provides some option for the situation where event log file gets huge and consume their storage. End users may give up replaying their events and want to delete the event log file, but given application is still running and writing the file, it's not safe to delete the file. End users will be able to delete some of old files after applying rolling over event log. ### Does this PR introduce any user-facing change? No, as the new feature is turned off by default. ### How was this patch tested? Added unit tests, as well as basic manual tests. Basic manual tests - ran SHS, ran structured streaming query with roll event log enabled, verified split files are generated as well as SHS can load these files, with handling app status as incomplete/complete. Closes #25670 from HeartSaVioR/SPARK-28869. Lead-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com> Co-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
1 parent 2f0a38c commit 100fc58

14 files changed

+1719
-410
lines changed
Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.history
19+
20+
import java.io.{BufferedInputStream, InputStream}
21+
import java.util.concurrent.ConcurrentHashMap
22+
import java.util.zip.{ZipEntry, ZipOutputStream}
23+
24+
import com.google.common.io.ByteStreams
25+
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
26+
import org.apache.hadoop.hdfs.DFSInputStream
27+
28+
import org.apache.spark.SparkConf
29+
import org.apache.spark.deploy.history.EventLogFileWriter.codecName
30+
import org.apache.spark.io.CompressionCodec
31+
import org.apache.spark.util.Utils
32+
33+
/** The base class of reader which will read the information of event log file(s). */
34+
abstract class EventLogFileReader(
35+
protected val fileSystem: FileSystem,
36+
val rootPath: Path) {
37+
38+
protected def fileSizeForDFS(path: Path): Option[Long] = {
39+
Utils.tryWithResource(fileSystem.open(path)) { in =>
40+
in.getWrappedStream match {
41+
case dfsIn: DFSInputStream => Some(dfsIn.getFileLength)
42+
case _ => None
43+
}
44+
}
45+
}
46+
47+
protected def addFileAsZipEntry(
48+
zipStream: ZipOutputStream,
49+
path: Path,
50+
entryName: String): Unit = {
51+
Utils.tryWithResource(fileSystem.open(path, 1 * 1024 * 1024)) { inputStream =>
52+
zipStream.putNextEntry(new ZipEntry(entryName))
53+
ByteStreams.copy(inputStream, zipStream)
54+
zipStream.closeEntry()
55+
}
56+
}
57+
58+
/** Returns the last index of event log files. None for single event log file. */
59+
def lastIndex: Option[Long]
60+
61+
/**
62+
* Returns the size of file for the last index of event log files. Returns its size for
63+
* single event log file.
64+
*/
65+
def fileSizeForLastIndex: Long
66+
67+
/** Returns whether the application is completed. */
68+
def completed: Boolean
69+
70+
/**
71+
* Returns the size of file for the last index (itself for single event log file) of event log
72+
* files, only when underlying input stream is DFSInputStream. Otherwise returns None.
73+
*/
74+
def fileSizeForLastIndexForDFS: Option[Long]
75+
76+
/**
77+
* Returns the modification time for the last index (itself for single event log file)
78+
* of event log files.
79+
*/
80+
def modificationTime: Long
81+
82+
/**
83+
* This method compresses the files passed in, and writes the compressed data out into the
84+
* ZipOutputStream passed in. Each file is written as a new ZipEntry with its name being
85+
* the name of the file being compressed.
86+
*/
87+
def zipEventLogFiles(zipStream: ZipOutputStream): Unit
88+
89+
/** Returns all available event log files. */
90+
def listEventLogFiles: Seq[FileStatus]
91+
92+
/** Returns the short compression name if being used. None if it's uncompressed. */
93+
def compressionCodec: Option[String]
94+
95+
/** Returns the size of all event log files. */
96+
def totalSize: Long
97+
}
98+
99+
object EventLogFileReader {
100+
// A cache for compression codecs to avoid creating the same codec many times
101+
private val codecMap = new ConcurrentHashMap[String, CompressionCodec]()
102+
103+
def apply(
104+
fs: FileSystem,
105+
path: Path,
106+
lastIndex: Option[Long]): EventLogFileReader = {
107+
lastIndex match {
108+
case Some(_) => new RollingEventLogFilesFileReader(fs, path)
109+
case None => new SingleFileEventLogFileReader(fs, path)
110+
}
111+
}
112+
113+
def apply(fs: FileSystem, path: Path): Option[EventLogFileReader] = {
114+
apply(fs, fs.getFileStatus(path))
115+
}
116+
117+
def apply(fs: FileSystem, status: FileStatus): Option[EventLogFileReader] = {
118+
if (isSingleEventLog(status)) {
119+
Some(new SingleFileEventLogFileReader(fs, status.getPath))
120+
} else if (isRollingEventLogs(status)) {
121+
Some(new RollingEventLogFilesFileReader(fs, status.getPath))
122+
} else {
123+
None
124+
}
125+
}
126+
127+
/**
128+
* Opens an event log file and returns an input stream that contains the event data.
129+
*
130+
* @return input stream that holds one JSON record per line.
131+
*/
132+
def openEventLog(log: Path, fs: FileSystem): InputStream = {
133+
val in = new BufferedInputStream(fs.open(log))
134+
try {
135+
val codec = codecName(log).map { c =>
136+
codecMap.computeIfAbsent(c, CompressionCodec.createCodec(new SparkConf, _))
137+
}
138+
codec.map(_.compressedContinuousInputStream(in)).getOrElse(in)
139+
} catch {
140+
case e: Throwable =>
141+
in.close()
142+
throw e
143+
}
144+
}
145+
146+
private def isSingleEventLog(status: FileStatus): Boolean = {
147+
!status.isDirectory &&
148+
// FsHistoryProvider used to generate a hidden file which can't be read. Accidentally
149+
// reading a garbage file is safe, but we would log an error which can be scary to
150+
// the end-user.
151+
!status.getPath.getName.startsWith(".")
152+
}
153+
154+
private def isRollingEventLogs(status: FileStatus): Boolean = {
155+
RollingEventLogFilesWriter.isEventLogDir(status)
156+
}
157+
}
158+
159+
/**
160+
* The reader which will read the information of single event log file.
161+
*
162+
* This reader gets the status of event log file only once when required;
163+
* It may not give "live" status of file that could be changing concurrently, and
164+
* FileNotFoundException could occur if the log file is renamed before getting the
165+
* status of log file.
166+
*/
167+
class SingleFileEventLogFileReader(
168+
fs: FileSystem,
169+
path: Path) extends EventLogFileReader(fs, path) {
170+
private lazy val status = fileSystem.getFileStatus(rootPath)
171+
172+
override def lastIndex: Option[Long] = None
173+
174+
override def fileSizeForLastIndex: Long = status.getLen
175+
176+
override def completed: Boolean = !rootPath.getName.endsWith(EventLogFileWriter.IN_PROGRESS)
177+
178+
override def fileSizeForLastIndexForDFS: Option[Long] = {
179+
if (completed) {
180+
Some(fileSizeForLastIndex)
181+
} else {
182+
fileSizeForDFS(rootPath)
183+
}
184+
}
185+
186+
override def modificationTime: Long = status.getModificationTime
187+
188+
override def zipEventLogFiles(zipStream: ZipOutputStream): Unit = {
189+
addFileAsZipEntry(zipStream, rootPath, rootPath.getName)
190+
}
191+
192+
override def listEventLogFiles: Seq[FileStatus] = Seq(status)
193+
194+
override def compressionCodec: Option[String] = EventLogFileWriter.codecName(rootPath)
195+
196+
override def totalSize: Long = fileSizeForLastIndex
197+
}
198+
199+
/**
200+
* The reader which will read the information of rolled multiple event log files.
201+
*
202+
* This reader lists the files only once; if caller would like to play with updated list,
203+
* it needs to create another reader instance.
204+
*/
205+
class RollingEventLogFilesFileReader(
206+
fs: FileSystem,
207+
path: Path) extends EventLogFileReader(fs, path) {
208+
import RollingEventLogFilesWriter._
209+
210+
private lazy val files: Seq[FileStatus] = {
211+
val ret = fs.listStatus(rootPath).toSeq
212+
require(ret.exists(isEventLogFile), "Log directory must contain at least one event log file!")
213+
require(ret.exists(isAppStatusFile), "Log directory must contain an appstatus file!")
214+
ret
215+
}
216+
217+
private lazy val appStatusFile = files.find(isAppStatusFile).get
218+
219+
private lazy val eventLogFiles: Seq[FileStatus] = {
220+
val eventLogFiles = files.filter(isEventLogFile).sortBy { status =>
221+
getIndex(status.getPath.getName)
222+
}
223+
val indices = eventLogFiles.map { file => getIndex(file.getPath.getName) }.sorted
224+
require((indices.head to indices.last) == indices, "Found missing event log file, expected" +
225+
s" indices: ${(indices.head to indices.last)}, actual: ${indices}")
226+
eventLogFiles
227+
}
228+
229+
override def lastIndex: Option[Long] = Some(getIndex(lastEventLogFile.getPath.getName))
230+
231+
override def fileSizeForLastIndex: Long = lastEventLogFile.getLen
232+
233+
override def completed: Boolean = {
234+
!appStatusFile.getPath.getName.endsWith(EventLogFileWriter.IN_PROGRESS)
235+
}
236+
237+
override def fileSizeForLastIndexForDFS: Option[Long] = {
238+
if (completed) {
239+
Some(fileSizeForLastIndex)
240+
} else {
241+
fileSizeForDFS(lastEventLogFile.getPath)
242+
}
243+
}
244+
245+
override def modificationTime: Long = lastEventLogFile.getModificationTime
246+
247+
override def zipEventLogFiles(zipStream: ZipOutputStream): Unit = {
248+
val dirEntryName = rootPath.getName + "/"
249+
zipStream.putNextEntry(new ZipEntry(dirEntryName))
250+
files.foreach { file =>
251+
addFileAsZipEntry(zipStream, file.getPath, dirEntryName + file.getPath.getName)
252+
}
253+
}
254+
255+
override def listEventLogFiles: Seq[FileStatus] = eventLogFiles
256+
257+
override def compressionCodec: Option[String] = {
258+
EventLogFileWriter.codecName(eventLogFiles.head.getPath)
259+
}
260+
261+
override def totalSize: Long = eventLogFiles.map(_.getLen).sum
262+
263+
private def lastEventLogFile: FileStatus = eventLogFiles.last
264+
}

0 commit comments

Comments
 (0)