Skip to content

Commit 761833b

Browse files
committed
Address incremental load on constructing EventFilterBuilder
1 parent 872ffbc commit 761833b

File tree

5 files changed

+368
-120
lines changed

5 files changed

+368
-120
lines changed

core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala

Lines changed: 95 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,15 @@ import java.io.IOException
2121
import java.net.URI
2222
import java.util.ServiceLoader
2323

24+
import scala.annotation.tailrec
2425
import scala.collection.JavaConverters._
2526

2627
import org.apache.hadoop.conf.Configuration
2728
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
2829

2930
import org.apache.spark.SparkConf
3031
import org.apache.spark.deploy.history.EventFilter.FilterStatistic
32+
import org.apache.spark.deploy.history.EventFilterBuildersLoader.LowerIndexLoadRequested
3133
import org.apache.spark.internal.Logging
3234
import org.apache.spark.internal.config.{EVENT_LOG_COMPACTION_SCORE_THRESHOLD, EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN}
3335
import org.apache.spark.scheduler.ReplayListenerBus
@@ -46,10 +48,6 @@ import org.apache.spark.util.Utils
4648
* represents approximate rate of filtered-out events. Score is being calculated via applying
4749
* heuristic; task events tend to take most size in event log.
4850
*
49-
* This class assumes caller will provide the sorted list of files which are sorted by the index of
50-
* event log file, with "at most" one compact file placed first if it exists. Caller should keep in
51-
* mind that this class doesn't care about the semantic of ordering.
52-
*
5351
* When compacting the files, the range of compaction for given file list is determined as:
5452
* (first ~ the file where there're `maxFilesToRetain` files on the right side)
5553
*
@@ -59,22 +57,43 @@ class EventLogFileCompactor(
5957
sparkConf: SparkConf,
6058
hadoopConf: Configuration,
6159
fs: FileSystem) extends Logging {
60+
import EventFilterBuildersLoader._
61+
6262
private val maxFilesToRetain: Int = sparkConf.get(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN)
6363
private val compactionThresholdScore: Double = sparkConf.get(EVENT_LOG_COMPACTION_SCORE_THRESHOLD)
6464

65-
def compact(eventLogFiles: Seq[FileStatus]): (CompactionResult.Value, Option[Long]) = {
66-
assertPrecondition(eventLogFiles)
65+
private var filterBuildersLoader = new EventFilterBuildersLoader(fs)
66+
private var loadedLogPath: Path = _
67+
68+
def compact(reader: EventLogFileReader): (CompactionResult.Value, Option[Long]) = {
69+
doCompact(reader)
70+
}
71+
72+
@tailrec
73+
private def doCompact(reader: EventLogFileReader): (CompactionResult.Value, Option[Long]) = {
74+
if (loadedLogPath == null) {
75+
loadedLogPath = reader.rootPath
76+
} else {
77+
require(loadedLogPath == null || reader.rootPath == loadedLogPath,
78+
"An instance of compactor should deal with same path of event log.")
79+
}
80+
81+
if (reader.lastIndex.isEmpty) {
82+
return (CompactionResult.NOT_ENOUGH_FILES, None)
83+
}
6784

85+
val eventLogFiles = reader.listEventLogFiles
6886
if (eventLogFiles.length < maxFilesToRetain) {
6987
return (CompactionResult.NOT_ENOUGH_FILES, None)
7088
}
7189

7290
val filesToCompact = findFilesToCompact(eventLogFiles)
7391
if (filesToCompact.isEmpty) {
74-
(CompactionResult.NOT_ENOUGH_FILES, None)
75-
} else {
76-
val builders = initializeBuilders(fs, filesToCompact.map(_.getPath))
92+
return (CompactionResult.NOT_ENOUGH_FILES, None)
93+
}
7794

95+
try {
96+
val builders = filterBuildersLoader.loadNewFiles(filesToCompact)
7897
val filters = builders.map(_.createFilter())
7998
val minScore = filters.flatMap(_.statistic()).map(calculateScore).min
8099

@@ -87,37 +106,14 @@ class EventLogFileCompactor(
87106
(CompactionResult.SUCCESS, Some(RollingEventLogFilesWriter.getEventLogFileIndex(
88107
filesToCompact.last.getPath.getName)))
89108
}
109+
} catch {
110+
case _: LowerIndexLoadRequested =>
111+
// reset loader and load again
112+
filterBuildersLoader = new EventFilterBuildersLoader(fs)
113+
doCompact(reader)
90114
}
91115
}
92116

93-
private def assertPrecondition(eventLogFiles: Seq[FileStatus]): Unit = {
94-
val idxCompactedFiles = eventLogFiles.zipWithIndex.filter { case (file, _) =>
95-
EventLogFileWriter.isCompacted(file.getPath)
96-
}
97-
require(idxCompactedFiles.size < 2 && idxCompactedFiles.headOption.forall(_._2 == 0),
98-
"The number of compact files should be at most 1, and should be placed first if exists.")
99-
}
100-
101-
/**
102-
* Loads all available EventFilterBuilders in classloader via ServiceLoader, and initializes
103-
* them via replaying events in given files.
104-
*/
105-
private def initializeBuilders(fs: FileSystem, files: Seq[Path]): Seq[EventFilterBuilder] = {
106-
val bus = new ReplayListenerBus()
107-
108-
val builders = ServiceLoader.load(classOf[EventFilterBuilder],
109-
Utils.getContextOrSparkClassLoader).asScala.toSeq
110-
builders.foreach(bus.addListener)
111-
112-
files.foreach { log =>
113-
Utils.tryWithResource(EventLogFileReader.openEventLog(log, fs)) { in =>
114-
bus.replay(in, log.getName)
115-
}
116-
}
117-
118-
builders
119-
}
120-
121117
private def calculateScore(stats: FilterStatistic): Double = {
122118
// For now it's simply measuring how many task events will be filtered out (rejected)
123119
// but it can be sophisticated later once we get more heuristic information and found
@@ -162,6 +158,68 @@ object CompactionResult extends Enumeration {
162158
val SUCCESS, NOT_ENOUGH_FILES, LOW_SCORE_FOR_COMPACTION = Value
163159
}
164160

161+
class EventFilterBuildersLoader(fs: FileSystem) {
162+
// the implementation of this bus is expected to be stateless
163+
private val bus = new ReplayListenerBus()
164+
165+
/** Loads all available EventFilterBuilders in classloader via ServiceLoader */
166+
private val filterBuilders: Seq[EventFilterBuilder] = ServiceLoader.load(
167+
classOf[EventFilterBuilder], Utils.getContextOrSparkClassLoader).asScala.toSeq
168+
169+
filterBuilders.foreach(bus.addListener)
170+
171+
private var latestIndexLoaded: Long = -1L
172+
173+
/** only exposed for testing; simple metric to help testing */
174+
private[history] var numFilesToLoad: Long = 0L
175+
176+
/**
177+
* Initializes EventFilterBuilders via replaying events in given files. Loading files are done
178+
* incrementally, via dropping indices which are already loaded and replaying remaining files.
179+
* For example, If the last index of requested files is same as the last index being loaded,
180+
* this will not replay any files.
181+
*
182+
* If the last index of requested files is smaller than the last index being loaded, it will
183+
* throw [[LowerIndexLoadRequested]], which caller can decide whether ignoring it or
184+
* invalidating loader and retrying.
185+
*/
186+
def loadNewFiles(eventLogFiles: Seq[FileStatus]): Seq[EventFilterBuilder] = {
187+
require(eventLogFiles.nonEmpty)
188+
189+
val idxToStatuses = eventLogFiles.map { status =>
190+
val idx = RollingEventLogFilesWriter.getEventLogFileIndex(status.getPath.getName)
191+
idx -> status
192+
}
193+
194+
val newLatestIdx = idxToStatuses.last._1
195+
if (newLatestIdx < latestIndexLoaded) {
196+
throw new LowerIndexLoadRequested("Loader already loads higher index of event log than" +
197+
" requested.")
198+
}
199+
200+
val filesToLoad = idxToStatuses
201+
.filter { case (idx, _) => idx > latestIndexLoaded }
202+
.map { case (_, status) => status.getPath }
203+
204+
if (filesToLoad.nonEmpty) {
205+
filesToLoad.foreach { log =>
206+
Utils.tryWithResource(EventLogFileReader.openEventLog(log, fs)) { in =>
207+
bus.replay(in, log.getName)
208+
}
209+
numFilesToLoad += 1
210+
}
211+
212+
latestIndexLoaded = newLatestIdx
213+
}
214+
215+
filterBuilders
216+
}
217+
}
218+
219+
object EventFilterBuildersLoader {
220+
class LowerIndexLoadRequested(_msg: String) extends Exception(_msg)
221+
}
222+
165223
/**
166224
* This class rewrites the event log files into one compact file: the compact file will only
167225
* contain the events which pass the filters. Events will be dropped only when all filters

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 42 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
159159
new HistoryServerDiskManager(conf, path, listing, clock)
160160
}
161161

162-
private val fileCompactor = new EventLogFileCompactor(conf, hadoopConf, fs)
162+
// Visible for testing.
163+
private[history] val logToCompactor = new mutable.HashMap[String, EventLogFileCompactor]
163164

164165
// Used to store the paths, which are being processed. This enable the replay log tasks execute
165166
// asynchronously and make sure that checkForLogs would not process a path repeatedly.
@@ -526,20 +527,26 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
526527
reader.fileSizeForLastIndex > 0
527528
}
528529
}
529-
.sortWith { case (entry1, entry2) =>
530-
entry1.modificationTime > entry2.modificationTime
530+
.sortWith { case (reader1, reader2) =>
531+
reader1.modificationTime > reader2.modificationTime
531532
}
532533

533534
if (updated.nonEmpty) {
534535
logDebug(s"New/updated attempts found: ${updated.size} ${updated.map(_.rootPath)}")
535536
}
536537

537-
updated.foreach { entry =>
538-
processing(entry.rootPath)
538+
updated.foreach { reader =>
539+
processing(reader.rootPath)
539540
try {
540541
val task: Runnable = () => {
541-
val updatedLastCompactionIndex = compact(entry)
542-
mergeApplicationListing(entry, newLastScanTime, true, updatedLastCompactionIndex)
542+
val (shouldRenewReader, updatedLastCompactionIndex) = compact(reader)
543+
// we should renew reader if the list of event log files are changed in `compact`
544+
val newReader = if (shouldRenewReader) {
545+
EventLogFileReader(fs, reader.rootPath).get
546+
} else {
547+
reader
548+
}
549+
mergeApplicationListing(newReader, newLastScanTime, true, updatedLastCompactionIndex)
543550
}
544551
replayExecutor.submit(task)
545552
} catch {
@@ -566,6 +573,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
566573
log.appId.foreach { appId =>
567574
cleanAppData(appId, log.attemptId, log.logPath)
568575
listing.delete(classOf[LogInfo], log.logPath)
576+
cleanupCompactor(log.logPath)
569577
}
570578
}
571579

@@ -576,26 +584,33 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
576584
}
577585

578586
/** exposed for testing */
579-
private[history] def compact(reader: EventLogFileReader): Option[Long] = {
587+
private[history] def compact(reader: EventLogFileReader): (Boolean, Option[Long]) = {
580588
reader.lastIndex match {
581589
case Some(lastIndex) =>
582590
try {
583-
val info = listing.read(classOf[LogInfo], reader.rootPath.toString)
591+
val rootPath = reader.rootPath.toString
592+
val info = listing.read(classOf[LogInfo], rootPath)
584593
if (info.lastCompactionIndex.isEmpty || info.lastCompactionIndex.get < lastIndex) {
585594
// haven't tried compaction for this index, do compaction
586-
val (_, lastCompactionIndex) = fileCompactor.compact(reader.listEventLogFiles)
587-
listing.write(info.copy(lastCompactionIndex = lastCompactionIndex))
588-
Some(lastIndex)
595+
val compactor = logToCompactor.getOrElseUpdate(rootPath,
596+
new EventLogFileCompactor(conf, hadoopConf, fs))
597+
val (compactionResult, lastCompactionIndex) = compactor.compact(reader)
598+
if (compactionResult == CompactionResult.SUCCESS) {
599+
listing.write(info.copy(lastCompactionIndex = lastCompactionIndex))
600+
(true, Some(lastIndex))
601+
} else {
602+
(false, Some(lastIndex))
603+
}
589604
} else {
590-
info.lastCompactionIndex
605+
(false, info.lastCompactionIndex)
591606
}
592607
} catch {
593608
case _: NoSuchElementException =>
594609
// this should exist, but ignoring doesn't hurt much
595-
None
610+
(false, None)
596611
}
597612

598-
case None => None // This is not applied to single event log file.
613+
case None => (false, None) // This is not applied to single event log file.
599614
}
600615
}
601616

@@ -702,12 +717,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
702717
case e: InterruptedException =>
703718
throw e
704719
case e: AccessControlException =>
720+
val rootPath = reader.rootPath
705721
// We don't have read permissions on the log file
706-
logWarning(s"Unable to read log ${reader.rootPath}", e)
707-
blacklist(reader.rootPath)
722+
logWarning(s"Unable to read log ${rootPath}", e)
723+
blacklist(rootPath)
708724
// SPARK-28157 We should remove this blacklisted entry from the KVStore
709725
// to handle permission-only changes with the same file sizes later.
710-
listing.delete(classOf[LogInfo], reader.rootPath.toString)
726+
listing.delete(classOf[LogInfo], rootPath.toString)
727+
cleanupCompactor(rootPath.toString)
711728
case e: Exception =>
712729
logError("Exception while merging application listings", e)
713730
} finally {
@@ -864,6 +881,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
864881
logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
865882
deleteLog(fs, new Path(log.logPath))
866883
listing.delete(classOf[LogInfo], log.logPath)
884+
cleanupCompactor(log.logPath)
867885
}
868886

869887
log.appId.foreach { appId =>
@@ -911,6 +929,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
911929
logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
912930
deleteLog(fs, new Path(log.logPath))
913931
listing.delete(classOf[LogInfo], log.logPath)
932+
cleanupCompactor(log.logPath)
914933
}
915934
}
916935

@@ -953,6 +972,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
953972
logInfo(s"Deleting expired event log for ${attempt.logPath}")
954973
val logPath = new Path(logDir, attempt.logPath)
955974
listing.delete(classOf[LogInfo], logPath.toString())
975+
cleanupCompactor(logPath.toString)
956976
cleanAppData(app.id, attempt.info.attemptId, logPath.toString())
957977
if (deleteLog(fs, logPath)) {
958978
countDeleted += 1
@@ -1240,6 +1260,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
12401260
}
12411261
deleted
12421262
}
1263+
1264+
private def cleanupCompactor(logPath: String): Unit = {
1265+
logToCompactor -= logPath
1266+
}
12431267
}
12441268

12451269
private[history] object FsHistoryProvider {

0 commit comments

Comments
 (0)