Skip to content

Commit

Permalink
[SPARK-28294][CORE] Support spark.history.fs.cleaner.maxNum configu…
Browse files Browse the repository at this point in the history
…ration

## What changes were proposed in this pull request?

Up to now, Apache Spark maintains the given event log directory by **time** policy, `spark.history.fs.cleaner.maxAge`. However, there are two issues.
1. Some file system has a limitation on the maximum number of files in a single directory. For example, HDFS `dfs.namenode.fs-limits.max-directory-items` is 1024 * 1024 by default.
https://hadoop.apache.org/docs/r3.2.0/hadoop-project-dist/hadoop-hdfs/hdfs-default.xml
2. Spark is sometimes unable to to clean up some old log files due to permission issues (mainly, security policy).

To handle both (1) and (2), this PR aims to support an additional policy configuration for the maximum number of files in the event log directory, `spark.history.fs.cleaner.maxNum`. Spark will try to keep the number of files in the event log directory according to this policy.

## How was this patch tested?

Pass the Jenkins with a newly added test case.

Closes apache#25072 from dongjoon-hyun/SPARK-28294.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
dongjoon-hyun committed Jul 10, 2019
1 parent 90c64ea commit bbc2be4
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
*/
private[history] def cleanLogs(): Unit = Utils.tryLog {
val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000
val maxNum = conf.get(MAX_LOG_NUM)

val expired = listing.view(classOf[ApplicationInfoWrapper])
.index("oldestAttempt")
Expand All @@ -817,23 +818,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val (remaining, toDelete) = app.attempts.partition { attempt =>
attempt.info.lastUpdated.getTime() >= maxTime
}

if (remaining.nonEmpty) {
val newApp = new ApplicationInfoWrapper(app.info, remaining)
listing.write(newApp)
}

toDelete.foreach { attempt =>
logInfo(s"Deleting expired event log for ${attempt.logPath}")
val logPath = new Path(logDir, attempt.logPath)
listing.delete(classOf[LogInfo], logPath.toString())
cleanAppData(app.id, attempt.info.attemptId, logPath.toString())
deleteLog(fs, logPath)
}

if (remaining.isEmpty) {
listing.delete(app.getClass(), app.id)
}
deleteAttemptLogs(app, remaining, toDelete)
}

// Delete log files that don't have a valid application and exceed the configured max age.
Expand All @@ -851,10 +836,59 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
listing.delete(classOf[LogInfo], log.logPath)
}
}

// If the number of files is bigger than MAX_LOG_NUM,
// clean up all completed attempts per application one by one.
val num = listing.view(classOf[LogInfo]).index("lastProcessed").asScala.size
var count = num - maxNum
if (count > 0) {
logInfo(s"Try to delete $count old event logs to keep $maxNum logs in total.")
val oldAttempts = listing.view(classOf[ApplicationInfoWrapper])
.index("oldestAttempt")
.asScala
oldAttempts.foreach { app =>
if (count > 0) {
// Applications may have multiple attempts, some of which may not be completed yet.
val (toDelete, remaining) = app.attempts.partition(_.info.completed)
count -= deleteAttemptLogs(app, remaining, toDelete)
}
}
if (count > 0) {
logWarning(s"Fail to clean up according to MAX_LOG_NUM policy ($maxNum).")
}
}

// Clean the blacklist from the expired entries.
clearBlacklist(CLEAN_INTERVAL_S)
}

private def deleteAttemptLogs(
app: ApplicationInfoWrapper,
remaining: List[AttemptInfoWrapper],
toDelete: List[AttemptInfoWrapper]): Int = {
if (remaining.nonEmpty) {
val newApp = new ApplicationInfoWrapper(app.info, remaining)
listing.write(newApp)
}

var countDeleted = 0
toDelete.foreach { attempt =>
logInfo(s"Deleting expired event log for ${attempt.logPath}")
val logPath = new Path(logDir, attempt.logPath)
listing.delete(classOf[LogInfo], logPath.toString())
cleanAppData(app.id, attempt.info.attemptId, logPath.toString())
if (deleteLog(fs, logPath)) {
countDeleted += 1
}
}

if (remaining.isEmpty) {
listing.delete(app.getClass(), app.id)
}

countDeleted
}

/**
* Delete driver logs from the configured spark dfs dir that exceed the configured max age
*/
Expand Down Expand Up @@ -1066,19 +1100,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
throw new NoSuchElementException(s"Cannot find attempt $attemptId of $appId."))
}

private def deleteLog(fs: FileSystem, log: Path): Unit = {
private def deleteLog(fs: FileSystem, log: Path): Boolean = {
var deleted = false
if (isBlacklisted(log)) {
logDebug(s"Skipping deleting $log as we don't have permissions on it.")
} else {
try {
fs.delete(log, true)
deleted = fs.delete(log, true)
} catch {
case _: AccessControlException =>
logInfo(s"No permission to delete $log, ignoring.")
case ioe: IOException =>
logError(s"IOException in cleaning $log", ioe)
}
}
deleted
}

private def isCompleted(name: String): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ private[spark] object History {
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("7d")

val MAX_LOG_NUM = ConfigBuilder("spark.history.fs.cleaner.maxNum")
.doc("The maximum number of log files in the event log directory.")
.intConf
.createWithDefault(Int.MaxValue)

val LOCAL_STORE_DIR = ConfigBuilder("spark.history.store.path")
.doc("Local directory where to cache application history information. By default this is " +
"not set, meaning all history information will be kept in memory.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,56 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
assert(!mockedProvider.shouldReloadLog(logInfo, fileStatus))
}

test("log cleaner with the maximum number of log files") {
val clock = new ManualClock(0)
(5 to 0 by -1).foreach { num =>
val log1_1 = newLogFile("app1", Some("attempt1"), inProgress = false)
writeFile(log1_1, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
SparkListenerApplicationEnd(2L)
)
log1_1.setLastModified(2L)

val log2_1 = newLogFile("app2", Some("attempt1"), inProgress = false)
writeFile(log2_1, true, None,
SparkListenerApplicationStart("app2", Some("app2"), 3L, "test", Some("attempt1")),
SparkListenerApplicationEnd(4L)
)
log2_1.setLastModified(4L)

val log3_1 = newLogFile("app3", Some("attempt1"), inProgress = false)
writeFile(log3_1, true, None,
SparkListenerApplicationStart("app3", Some("app3"), 5L, "test", Some("attempt1")),
SparkListenerApplicationEnd(6L)
)
log3_1.setLastModified(6L)

val log1_2_incomplete = newLogFile("app1", Some("attempt2"), inProgress = false)
writeFile(log1_2_incomplete, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 7L, "test", Some("attempt2"))
)
log1_2_incomplete.setLastModified(8L)

val log3_2 = newLogFile("app3", Some("attempt2"), inProgress = false)
writeFile(log3_2, true, None,
SparkListenerApplicationStart("app3", Some("app3"), 9L, "test", Some("attempt2")),
SparkListenerApplicationEnd(10L)
)
log3_2.setLastModified(10L)

val provider = new FsHistoryProvider(createTestConf().set(MAX_LOG_NUM.key, s"$num"), clock)
updateAndCheck(provider) { list =>
assert(log1_1.exists() == (num > 4))
assert(log1_2_incomplete.exists()) // Always exists for all configurations

assert(log2_1.exists() == (num > 3))

assert(log3_1.exists() == (num > 2))
assert(log3_2.exists() == (num > 2))
}
}
}

/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example:
Expand Down
16 changes: 15 additions & 1 deletion docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,11 @@ Security options for the Spark History Server are covered more detail in the
<td>1d</td>
<td>
How often the filesystem job history cleaner checks for files to delete.
Files are only deleted if they are older than <code>spark.history.fs.cleaner.maxAge</code>
Files are deleted if at least one of two conditions holds.
First, they're deleted if they're older than <code>spark.history.fs.cleaner.maxAge</code>.
They are also deleted if the number of files is more than
<code>spark.history.fs.cleaner.maxNum</code>, Spark tries to clean up the completed attempts
from the applications based on the order of their oldest attempt time.
</td>
</tr>
<tr>
Expand All @@ -200,6 +204,16 @@ Security options for the Spark History Server are covered more detail in the
Job history files older than this will be deleted when the filesystem history cleaner runs.
</td>
</tr>
<tr>
<td>spark.history.fs.cleaner.maxNum</td>
<td>Int.MaxValue</td>
<td>
The maximum number of files in the event log directory.
Spark tries to clean up the completed attempt logs to maintain the log directory under this limit.
This should be smaller than the underlying file system limit like
`dfs.namenode.fs-limits.max-directory-items` in HDFS.
</td>
</tr>
<tr>
<td>spark.history.fs.endEventReparseChunkSize</td>
<td>1m</td>
Expand Down

0 comments on commit bbc2be4

Please sign in to comment.