Skip to content

Commit

Permalink
[SPARK-47579][CORE][PART4] Migrate logInfo with variables to structur…
Browse files Browse the repository at this point in the history
…ed logging framework

The PR aims to migrate `logInfo` in Core module with variables to structured logging framework.

### Why are the changes needed?

To enhance Apache Spark's logging system by implementing structured logging.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

- Pass GA.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#46724 from zeotuan/coreInfo3.

Lead-authored-by: Tuan Pham <Tuan.Pham@wisetechglobal.com>
Co-authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
  • Loading branch information
zeotuan and gengliangwang committed May 30, 2024
1 parent 51d4f64 commit 772d7d3
Show file tree
Hide file tree
Showing 16 changed files with 167 additions and 103 deletions.
14 changes: 14 additions & 0 deletions common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,12 @@ trait LogKey {
*/
object LogKeys {
case object ACCUMULATOR_ID extends LogKey
case object ACL_ENABLED extends LogKey
case object ACTUAL_NUM_FILES extends LogKey
case object ACTUAL_PARTITION_COLUMN extends LogKey
case object ADDED_JARS extends LogKey
case object ADMIN_ACLS extends LogKey
case object ADMIN_ACL_GROUPS extends LogKey
case object AGGREGATE_FUNCTIONS extends LogKey
case object ALPHA extends LogKey
case object ANALYSIS_ERROR extends LogKey
Expand All @@ -72,7 +75,9 @@ object LogKeys {
case object APP_ID extends LogKey
case object APP_NAME extends LogKey
case object APP_STATE extends LogKey
case object ARCHIVE_NAME extends LogKey
case object ARGS extends LogKey
case object ATTRIBUTE_MAP extends LogKey
case object AUTH_ENABLED extends LogKey
case object BACKUP_FILE extends LogKey
case object BARRIER_EPOCH extends LogKey
Expand Down Expand Up @@ -226,6 +231,7 @@ object LogKeys {
case object EXECUTOR_SHUFFLE_INFO extends LogKey
case object EXECUTOR_STATE extends LogKey
case object EXECUTOR_TIMEOUT extends LogKey
case object EXECUTOR_USER_CLASS_PATH_FIRST extends LogKey
case object EXEC_AMOUNT extends LogKey
case object EXISTING_FILE extends LogKey
case object EXISTING_PATH extends LogKey
Expand Down Expand Up @@ -283,10 +289,12 @@ object LogKeys {
case object HIVE_OPERATION_TYPE extends LogKey
case object HOST extends LogKey
case object HOST_LOCAL_BLOCKS_SIZE extends LogKey
case object HOST_NAME extends LogKey
case object HOST_NAMES extends LogKey
case object HOST_PORT extends LogKey
case object HOST_PORT2 extends LogKey
case object HUGE_METHOD_LIMIT extends LogKey
case object HYBRID_STORE_DISK_BACKEND extends LogKey
case object IDENTIFIER extends LogKey
case object INCOMPATIBLE_TYPES extends LogKey
case object INDEX extends LogKey
Expand Down Expand Up @@ -359,6 +367,7 @@ object LogKeys {
case object MAX_METHOD_CODE_SIZE extends LogKey
case object MAX_NUM_BINS extends LogKey
case object MAX_NUM_CHUNKS extends LogKey
case object MAX_NUM_FILES extends LogKey
case object MAX_NUM_LOG_POLICY extends LogKey
case object MAX_NUM_PARTITIONS extends LogKey
case object MAX_NUM_POSSIBLE_BINS extends LogKey
Expand Down Expand Up @@ -481,6 +490,7 @@ object LogKeys {
case object NUM_PRUNED extends LogKey
case object NUM_PUSH_MERGED_LOCAL_BLOCKS extends LogKey
case object NUM_RECORDS_READ extends LogKey
case object NUM_RELEASED_LOCKS extends LogKey
case object NUM_REMAINED extends LogKey
case object NUM_REMOTE_BLOCKS extends LogKey
case object NUM_REMOVED_WORKERS extends LogKey
Expand Down Expand Up @@ -599,11 +609,13 @@ object LogKeys {
case object RECOVERY_STATE extends LogKey
case object REDACTED_STATEMENT extends LogKey
case object REDUCE_ID extends LogKey
case object REGEX extends LogKey
case object REGISTERED_EXECUTOR_FILE extends LogKey
case object REGISTER_MERGE_RESULTS extends LogKey
case object RELATION_NAME extends LogKey
case object RELATION_OUTPUT extends LogKey
case object RELATIVE_TOLERANCE extends LogKey
case object RELEASED_LOCKS extends LogKey
case object REMAINING_PARTITIONS extends LogKey
case object REMOTE_ADDRESS extends LogKey
case object REMOTE_BLOCKS_SIZE extends LogKey
Expand Down Expand Up @@ -721,6 +733,7 @@ object LogKeys {
case object TASK_LOCALITY extends LogKey
case object TASK_NAME extends LogKey
case object TASK_REQUIREMENTS extends LogKey
case object TASK_RESOURCES extends LogKey
case object TASK_RESOURCE_ASSIGNMENTS extends LogKey
case object TASK_SET_ID extends LogKey
case object TASK_SET_MANAGER extends LogKey
Expand Down Expand Up @@ -777,6 +790,7 @@ object LogKeys {
case object URIS extends LogKey
case object URL extends LogKey
case object URL2 extends LogKey
case object URLS extends LogKey
case object USER_ID extends LogKey
case object USER_NAME extends LogKey
case object UUID extends LogKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.spark.internal.config._
import org.apache.spark.internal.config.History._
import org.apache.spark.internal.config.Status._
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.internal.config.UI
import org.apache.spark.internal.config.UI._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.ReplayListenerBus._
Expand Down Expand Up @@ -107,9 +108,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private val historyUiAclsEnable = conf.get(History.HISTORY_SERVER_UI_ACLS_ENABLE)
private val historyUiAdminAcls = conf.get(History.HISTORY_SERVER_UI_ADMIN_ACLS)
private val historyUiAdminAclsGroups = conf.get(History.HISTORY_SERVER_UI_ADMIN_ACLS_GROUPS)
logInfo(s"History server ui acls " + (if (historyUiAclsEnable) "enabled" else "disabled") +
"; users with admin permissions: " + historyUiAdminAcls.mkString(",") +
"; groups with admin permissions: " + historyUiAdminAclsGroups.mkString(","))
logInfo(log"History server ui acls" +
log" ${MDC(ACL_ENABLED, if (historyUiAclsEnable) "enabled" else "disabled")}" +
log"; users with admin permissions:" +
log" ${MDC(LogKeys.ADMIN_ACLS, historyUiAdminAcls.mkString(","))}" +
log"; groups with admin permissions:" +
log" ${MDC(ADMIN_ACL_GROUPS, historyUiAdminAclsGroups.mkString(","))}")

private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
// Visible for testing
Expand Down Expand Up @@ -482,8 +486,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
true
} catch {
case e: IllegalArgumentException =>
logInfo("Exception in getting modificationTime of "
+ reader.rootPath.getName + ". " + e.toString)
logInfo(log"Exception in getting modificationTime of"
+ log" ${MDC(PATH, reader.rootPath.getName)}. ${MDC(EXCEPTION, e.toString)}")
false
}
}
Expand Down Expand Up @@ -550,7 +554,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
try {
if (conf.get(CLEANER_ENABLED) && reader.modificationTime <
clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000) {
logInfo(s"Deleting expired event log ${reader.rootPath.toString}")
logInfo(log"Deleting expired event log ${MDC(PATH, reader.rootPath.toString)}")
deleteLog(fs, reader.rootPath)
// If the LogInfo read had succeeded, but the ApplicationInafoWrapper
// read failure and throw the exception, we should also cleanup the log
Expand Down Expand Up @@ -801,7 +805,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val listener = new AppListingListener(reader, clock, shouldHalt)
bus.addListener(listener)

logInfo(s"Parsing $logPath for listing data...")
logInfo(log"Parsing ${MDC(PATH, logPath)} for listing data...")
val logFiles = reader.listEventLogFiles
parseAppEventLogs(logFiles, bus, !appCompleted, eventsFilter)

Expand Down Expand Up @@ -829,7 +833,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
Utils.tryWithResource(EventLogFileReader.openEventLog(lastFile.getPath, fs)) { in =>
val target = lastFile.getLen - reparseChunkSize
if (target > 0) {
logInfo(s"Looking for end event; skipping $target bytes from $logPath...")
logInfo(log"Looking for end event; skipping ${MDC(NUM_BYTES, target)} bytes" +
log" from ${MDC(PATH, logPath)}...")
var skipped = 0L
while (skipped < target) {
skipped += in.skip(target - skipped)
Expand All @@ -848,7 +853,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

logInfo(s"Finished parsing $logPath")
logInfo(log"Finished parsing ${MDC(PATH, logPath)}")

listener.applicationInfo match {
case Some(app) if !lookForEndEvent || app.attempts.head.info.completed =>
Expand Down Expand Up @@ -883,7 +888,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// In this case, the attempt is still not marked as finished but was expected to. This can
// mean the end event is before the configured threshold, so call the method again to
// re-parse the whole log.
logInfo(s"Reparsing $logPath since end event was not found.")
logInfo(log"Reparsing ${MDC(PATH, logPath)} since end event was not found.")
doMergeApplicationListingInternal(reader, scanTime, enableOptimizations = false,
lastEvaluatedForCompaction)

Expand Down Expand Up @@ -952,7 +957,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val log = listing.read(classOf[LogInfo], logPath)

if (log.lastProcessed <= maxTime && log.appId.isEmpty) {
logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
logInfo(log"Deleting invalid / corrupt event log ${MDC(PATH, log.logPath)}")
deleteLog(fs, new Path(log.logPath))
listing.delete(classOf[LogInfo], log.logPath)
}
Expand Down Expand Up @@ -994,7 +999,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
.first(maxTime), Int.MaxValue) { l => l.logType == null || l.logType == LogType.EventLogs }
stale.filterNot(isProcessing).foreach { log =>
if (log.appId.isEmpty) {
logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
logInfo(log"Deleting invalid / corrupt event log ${MDC(PATH, log.logPath)}")
deleteLog(fs, new Path(log.logPath))
listing.delete(classOf[LogInfo], log.logPath)
}
Expand All @@ -1005,7 +1010,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val num = KVUtils.size(listing.view(classOf[LogInfo]).index("lastProcessed"))
var count = num - maxNum
if (count > 0) {
logInfo(s"Try to delete $count old event logs to keep $maxNum logs in total.")
logInfo(log"Try to delete ${MDC(NUM_FILES, count)} old event logs" +
log" to keep ${MDC(MAX_NUM_FILES, maxNum)} logs in total.")
KVUtils.foreach(listing.view(classOf[ApplicationInfoWrapper]).index("oldestAttempt")) { app =>
if (count > 0) {
// Applications may have multiple attempts, some of which may not be completed yet.
Expand Down Expand Up @@ -1034,7 +1040,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

var countDeleted = 0
toDelete.foreach { attempt =>
logInfo(s"Deleting expired event log for ${attempt.logPath}")
logInfo(log"Deleting expired event log for ${MDC(PATH, attempt.logPath)}")
val logPath = new Path(logDir, attempt.logPath)
listing.delete(classOf[LogInfo], logPath.toString())
cleanAppData(app.id, attempt.info.attemptId, logPath.toString())
Expand Down Expand Up @@ -1082,7 +1088,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
false
}
if (deleteFile) {
logInfo(s"Deleting expired driver log for: $logFileStr")
logInfo(log"Deleting expired driver log for: ${MDC(PATH, logFileStr)}")
listing.delete(classOf[LogInfo], logFileStr)
deleteLog(driverLogFs, f.getPath())
}
Expand All @@ -1095,7 +1101,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
.reverse()
.first(maxTime), Int.MaxValue) { l => l.logType != null && l.logType == LogType.DriverLogs }
stale.filterNot(isProcessing).foreach { log =>
logInfo(s"Deleting invalid driver log ${log.logPath}")
logInfo(log"Deleting invalid driver log ${MDC(PATH, log.logPath)}")
listing.delete(classOf[LogInfo], log.logPath)
deleteLog(driverLogFs, new Path(log.logPath))
}
Expand Down Expand Up @@ -1124,10 +1130,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

try {
val eventLogFiles = reader.listEventLogFiles
logInfo(s"Parsing ${reader.rootPath} to re-build UI...")
logInfo(log"Parsing ${MDC(PATH, reader.rootPath)} to re-build UI...")
parseAppEventLogs(eventLogFiles, replayBus, !reader.completed)
trackingStore.close(false)
logInfo(s"Finished parsing ${reader.rootPath}")
logInfo(log"Finished parsing ${MDC(PATH, reader.rootPath)}")
} catch {
case e: Exception =>
Utils.tryLogNonFatalError {
Expand Down Expand Up @@ -1228,7 +1234,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
return KVUtils.open(path, metadata, conf, live = false)
} catch {
case e: Exception =>
logInfo(s"Failed to open existing store for $appId/${attempt.info.attemptId}.", e)
logInfo(log"Failed to open existing store for" +
log" ${MDC(APP_ID, appId)}/${MDC(LogKeys.APP_ATTEMPT_ID, attempt.info.attemptId)}.", e)
dm.release(appId, attempt.info.attemptId, delete = true)
}
}
Expand All @@ -1244,11 +1251,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
case e: RuntimeException if e.getMessage != null &&
e.getMessage.contains("Not enough memory to create hybrid") =>
// Handle exception from `HistoryServerMemoryManager.lease`.
logInfo(s"Failed to create HybridStore for $appId/${attempt.info.attemptId}." +
s" Using $hybridStoreDiskBackend. " + e.getMessage)
logInfo(log"Failed to create HybridStore for" +
log" ${MDC(APP_ID, appId)}/${MDC(LogKeys.APP_ATTEMPT_ID, attempt.info.attemptId)}." +
log" Using ${MDC(LogKeys.HYBRID_STORE_DISK_BACKEND, hybridStoreDiskBackend)}." +
log" ${MDC(EXCEPTION, e.getMessage)}")
case e: Exception =>
logInfo(s"Failed to create HybridStore for $appId/${attempt.info.attemptId}." +
s" Using $hybridStoreDiskBackend.", e)
logInfo(log"Failed to create HybridStore for" +
log" ${MDC(APP_ID, appId)}/${MDC(LogKeys.APP_ATTEMPT_ID, attempt.info.attemptId)}." +
log" Using ${MDC(LogKeys.HYBRID_STORE_DISK_BACKEND, hybridStoreDiskBackend)}.", e)
}
}

Expand Down Expand Up @@ -1295,13 +1305,15 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// Create a disk-base KVStore and start a background thread to dump data to it
var lease: dm.Lease = null
try {
logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...")
logInfo(log"Leasing disk manager space for app" +
log" ${MDC(APP_ID, appId)} / ${MDC(LogKeys.APP_ATTEMPT_ID, attempt.info.attemptId)}...")
lease = dm.lease(reader.totalSize, reader.compressionCodec.isDefined)
val diskStore = KVUtils.open(lease.tmpPath, metadata, conf, live = false)
hybridStore.setDiskStore(diskStore)
hybridStore.switchToDiskStore(new HybridStore.SwitchToDiskStoreListener {
override def onSwitchToDiskStoreSuccess(): Unit = {
logInfo(s"Completely switched to diskStore for app $appId / ${attempt.info.attemptId}.")
logInfo(log"Completely switched to diskStore for app" +
log" ${MDC(APP_ID, appId)} / ${MDC(LogKeys.APP_ATTEMPT_ID, attempt.info.attemptId)}.")
diskStore.close()
val newStorePath = lease.commit(appId, attempt.info.attemptId)
hybridStore.setDiskStore(KVUtils.open(newStorePath, metadata, conf, live = false))
Expand Down Expand Up @@ -1338,7 +1350,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
attempt.lastIndex)
val isCompressed = reader.compressionCodec.isDefined
logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...")
logInfo(log"Leasing disk manager space for app" +
log" ${MDC(APP_ID, appId)} / ${MDC(LogKeys.APP_ATTEMPT_ID, attempt.info.attemptId)}...")
val lease = dm.lease(reader.totalSize, isCompressed)
try {
Utils.tryWithResource(KVUtils.open(lease.tmpPath, metadata, conf, live = false)) { store =>
Expand Down Expand Up @@ -1408,7 +1421,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
deleted = fs.delete(log, true)
} catch {
case _: AccessControlException =>
logInfo(s"No permission to delete $log, ignoring.")
logInfo(log"No permission to delete ${MDC(PATH, log)}, ignoring.")
case ioe: IOException =>
logError(log"IOException in cleaning ${MDC(PATH, log)}", ioe)
}
Expand Down Expand Up @@ -1560,7 +1573,7 @@ private[history] class AppListingListener(

val allProperties = event.environmentDetails("Spark Properties").toMap
attempt.viewAcls = emptyStringToNone(allProperties.get(UI_VIEW_ACLS.key))
attempt.adminAcls = emptyStringToNone(allProperties.get(ADMIN_ACLS.key))
attempt.adminAcls = emptyStringToNone(allProperties.get(UI.ADMIN_ACLS.key))
attempt.viewAclsGroups = emptyStringToNone(allProperties.get(UI_VIEW_ACLS_GROUPS.key))
attempt.adminAclsGroups = emptyStringToNone(allProperties.get(ADMIN_ACLS_GROUPS.key))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.commons.io.FileUtils
import org.apache.spark.SparkConf
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.internal.config.History
import org.apache.spark.internal.config.History._
import org.apache.spark.internal.config.History.HybridStoreDiskBackend.ROCKSDB
import org.apache.spark.status.KVUtils
Expand Down Expand Up @@ -58,7 +59,7 @@ private class HistoryServerDiskManager(
throw new IllegalArgumentException(s"Failed to create app directory ($appStoreDir).")
}
private val extension =
if (conf.get(HYBRID_STORE_DISK_BACKEND) == ROCKSDB.toString) ".rdb" else ".ldb"
if (conf.get(History.HYBRID_STORE_DISK_BACKEND) == ROCKSDB.toString) ".rdb" else ".ldb"

private val tmpStoreDir = new File(path, "temp")
if (!tmpStoreDir.isDirectory() && !tmpStoreDir.mkdir()) {
Expand Down
Loading

0 comments on commit 772d7d3

Please sign in to comment.