From 772d7d3aba17ca251b9464503963cc30635f69c0 Mon Sep 17 00:00:00 2001 From: Tuan Pham Date: Wed, 29 May 2024 17:35:47 -0700 Subject: [PATCH] [SPARK-47579][CORE][PART4] Migrate logInfo with variables to structured logging framework The PR aims to migrate `logInfo` in Core module with variables to structured logging framework. ### Why are the changes needed? To enhance Apache Spark's logging system by implementing structured logging. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46724 from zeotuan/coreInfo3. Lead-authored-by: Tuan Pham Co-authored-by: Gengliang Wang Signed-off-by: Gengliang Wang --- .../org/apache/spark/internal/LogKey.scala | 14 ++++ .../deploy/history/FsHistoryProvider.scala | 69 +++++++++++-------- .../history/HistoryServerDiskManager.scala | 3 +- .../CoarseGrainedExecutorBackend.scala | 18 ++--- .../org/apache/spark/executor/Executor.scala | 66 +++++++++++------- .../executor/ExecutorLogUrlHandler.scala | 9 ++- .../io/HadoopMapRedCommitProtocol.scala | 6 +- .../spark/internal/io/SparkHadoopWriter.scala | 7 +- .../internal/plugin/PluginContainer.scala | 19 ++--- .../ResourceDiscoveryScriptPlugin.scala | 6 +- .../spark/resource/ResourceProfile.scala | 8 +-- .../resource/ResourceProfileManager.scala | 5 +- .../spark/status/AppStatusListener.scala | 4 +- .../apache/spark/status/AppStatusStore.scala | 2 +- .../scala/org/apache/spark/util/Utils.scala | 29 ++++---- .../collection/ExternalAppendOnlyMap.scala | 5 +- 16 files changed, 167 insertions(+), 103 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 1366277827f75..cf31569059663 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -59,9 +59,12 @@ trait LogKey { */ object LogKeys { case object ACCUMULATOR_ID extends LogKey + case object ACL_ENABLED extends LogKey case object ACTUAL_NUM_FILES extends LogKey case object ACTUAL_PARTITION_COLUMN extends LogKey case object ADDED_JARS extends LogKey + case object ADMIN_ACLS extends LogKey + case object ADMIN_ACL_GROUPS extends LogKey case object AGGREGATE_FUNCTIONS extends LogKey case object ALPHA extends LogKey case object ANALYSIS_ERROR extends LogKey @@ -72,7 +75,9 @@ object LogKeys { case object APP_ID extends LogKey case object APP_NAME extends LogKey case object APP_STATE extends LogKey + case object ARCHIVE_NAME extends LogKey case object ARGS extends LogKey + case object ATTRIBUTE_MAP extends LogKey case object AUTH_ENABLED extends LogKey case object BACKUP_FILE extends LogKey case object BARRIER_EPOCH extends LogKey @@ -226,6 +231,7 @@ object LogKeys { case object EXECUTOR_SHUFFLE_INFO extends LogKey case object EXECUTOR_STATE extends LogKey case object EXECUTOR_TIMEOUT extends LogKey + case object EXECUTOR_USER_CLASS_PATH_FIRST extends LogKey case object EXEC_AMOUNT extends LogKey case object EXISTING_FILE extends LogKey case object EXISTING_PATH extends LogKey @@ -283,10 +289,12 @@ object LogKeys { case object HIVE_OPERATION_TYPE extends LogKey case object HOST extends LogKey case object HOST_LOCAL_BLOCKS_SIZE extends LogKey + case object HOST_NAME extends LogKey case object HOST_NAMES extends LogKey case object HOST_PORT extends LogKey case object HOST_PORT2 extends LogKey case object HUGE_METHOD_LIMIT extends LogKey + case object HYBRID_STORE_DISK_BACKEND extends LogKey case object IDENTIFIER extends LogKey case object INCOMPATIBLE_TYPES extends LogKey case object INDEX extends LogKey @@ -359,6 +367,7 @@ object LogKeys { case object MAX_METHOD_CODE_SIZE extends LogKey case object MAX_NUM_BINS extends LogKey case object MAX_NUM_CHUNKS extends LogKey + case object MAX_NUM_FILES extends LogKey case object MAX_NUM_LOG_POLICY extends LogKey case object MAX_NUM_PARTITIONS extends LogKey case object MAX_NUM_POSSIBLE_BINS extends LogKey @@ -481,6 +490,7 @@ object LogKeys { case object NUM_PRUNED extends LogKey case object NUM_PUSH_MERGED_LOCAL_BLOCKS extends LogKey case object NUM_RECORDS_READ extends LogKey + case object NUM_RELEASED_LOCKS extends LogKey case object NUM_REMAINED extends LogKey case object NUM_REMOTE_BLOCKS extends LogKey case object NUM_REMOVED_WORKERS extends LogKey @@ -599,11 +609,13 @@ object LogKeys { case object RECOVERY_STATE extends LogKey case object REDACTED_STATEMENT extends LogKey case object REDUCE_ID extends LogKey + case object REGEX extends LogKey case object REGISTERED_EXECUTOR_FILE extends LogKey case object REGISTER_MERGE_RESULTS extends LogKey case object RELATION_NAME extends LogKey case object RELATION_OUTPUT extends LogKey case object RELATIVE_TOLERANCE extends LogKey + case object RELEASED_LOCKS extends LogKey case object REMAINING_PARTITIONS extends LogKey case object REMOTE_ADDRESS extends LogKey case object REMOTE_BLOCKS_SIZE extends LogKey @@ -721,6 +733,7 @@ object LogKeys { case object TASK_LOCALITY extends LogKey case object TASK_NAME extends LogKey case object TASK_REQUIREMENTS extends LogKey + case object TASK_RESOURCES extends LogKey case object TASK_RESOURCE_ASSIGNMENTS extends LogKey case object TASK_SET_ID extends LogKey case object TASK_SET_MANAGER extends LogKey @@ -777,6 +790,7 @@ object LogKeys { case object URIS extends LogKey case object URL extends LogKey case object URL2 extends LogKey + case object URLS extends LogKey case object USER_ID extends LogKey case object USER_NAME extends LogKey case object UUID extends LogKey diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index bfd96167b1691..92ac10ac9fb81 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -44,6 +44,7 @@ import org.apache.spark.internal.config._ import org.apache.spark.internal.config.History._ import org.apache.spark.internal.config.Status._ import org.apache.spark.internal.config.Tests.IS_TESTING +import org.apache.spark.internal.config.UI import org.apache.spark.internal.config.UI._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.ReplayListenerBus._ @@ -107,9 +108,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val historyUiAclsEnable = conf.get(History.HISTORY_SERVER_UI_ACLS_ENABLE) private val historyUiAdminAcls = conf.get(History.HISTORY_SERVER_UI_ADMIN_ACLS) private val historyUiAdminAclsGroups = conf.get(History.HISTORY_SERVER_UI_ADMIN_ACLS_GROUPS) - logInfo(s"History server ui acls " + (if (historyUiAclsEnable) "enabled" else "disabled") + - "; users with admin permissions: " + historyUiAdminAcls.mkString(",") + - "; groups with admin permissions: " + historyUiAdminAclsGroups.mkString(",")) + logInfo(log"History server ui acls" + + log" ${MDC(ACL_ENABLED, if (historyUiAclsEnable) "enabled" else "disabled")}" + + log"; users with admin permissions:" + + log" ${MDC(LogKeys.ADMIN_ACLS, historyUiAdminAcls.mkString(","))}" + + log"; groups with admin permissions:" + + log" ${MDC(ADMIN_ACL_GROUPS, historyUiAdminAclsGroups.mkString(","))}") private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) // Visible for testing @@ -482,8 +486,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) true } catch { case e: IllegalArgumentException => - logInfo("Exception in getting modificationTime of " - + reader.rootPath.getName + ". " + e.toString) + logInfo(log"Exception in getting modificationTime of" + + log" ${MDC(PATH, reader.rootPath.getName)}. ${MDC(EXCEPTION, e.toString)}") false } } @@ -550,7 +554,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) try { if (conf.get(CLEANER_ENABLED) && reader.modificationTime < clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000) { - logInfo(s"Deleting expired event log ${reader.rootPath.toString}") + logInfo(log"Deleting expired event log ${MDC(PATH, reader.rootPath.toString)}") deleteLog(fs, reader.rootPath) // If the LogInfo read had succeeded, but the ApplicationInafoWrapper // read failure and throw the exception, we should also cleanup the log @@ -801,7 +805,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val listener = new AppListingListener(reader, clock, shouldHalt) bus.addListener(listener) - logInfo(s"Parsing $logPath for listing data...") + logInfo(log"Parsing ${MDC(PATH, logPath)} for listing data...") val logFiles = reader.listEventLogFiles parseAppEventLogs(logFiles, bus, !appCompleted, eventsFilter) @@ -829,7 +833,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) Utils.tryWithResource(EventLogFileReader.openEventLog(lastFile.getPath, fs)) { in => val target = lastFile.getLen - reparseChunkSize if (target > 0) { - logInfo(s"Looking for end event; skipping $target bytes from $logPath...") + logInfo(log"Looking for end event; skipping ${MDC(NUM_BYTES, target)} bytes" + + log" from ${MDC(PATH, logPath)}...") var skipped = 0L while (skipped < target) { skipped += in.skip(target - skipped) @@ -848,7 +853,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - logInfo(s"Finished parsing $logPath") + logInfo(log"Finished parsing ${MDC(PATH, logPath)}") listener.applicationInfo match { case Some(app) if !lookForEndEvent || app.attempts.head.info.completed => @@ -883,7 +888,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // In this case, the attempt is still not marked as finished but was expected to. This can // mean the end event is before the configured threshold, so call the method again to // re-parse the whole log. - logInfo(s"Reparsing $logPath since end event was not found.") + logInfo(log"Reparsing ${MDC(PATH, logPath)} since end event was not found.") doMergeApplicationListingInternal(reader, scanTime, enableOptimizations = false, lastEvaluatedForCompaction) @@ -952,7 +957,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val log = listing.read(classOf[LogInfo], logPath) if (log.lastProcessed <= maxTime && log.appId.isEmpty) { - logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") + logInfo(log"Deleting invalid / corrupt event log ${MDC(PATH, log.logPath)}") deleteLog(fs, new Path(log.logPath)) listing.delete(classOf[LogInfo], log.logPath) } @@ -994,7 +999,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) .first(maxTime), Int.MaxValue) { l => l.logType == null || l.logType == LogType.EventLogs } stale.filterNot(isProcessing).foreach { log => if (log.appId.isEmpty) { - logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") + logInfo(log"Deleting invalid / corrupt event log ${MDC(PATH, log.logPath)}") deleteLog(fs, new Path(log.logPath)) listing.delete(classOf[LogInfo], log.logPath) } @@ -1005,7 +1010,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val num = KVUtils.size(listing.view(classOf[LogInfo]).index("lastProcessed")) var count = num - maxNum if (count > 0) { - logInfo(s"Try to delete $count old event logs to keep $maxNum logs in total.") + logInfo(log"Try to delete ${MDC(NUM_FILES, count)} old event logs" + + log" to keep ${MDC(MAX_NUM_FILES, maxNum)} logs in total.") KVUtils.foreach(listing.view(classOf[ApplicationInfoWrapper]).index("oldestAttempt")) { app => if (count > 0) { // Applications may have multiple attempts, some of which may not be completed yet. @@ -1034,7 +1040,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) var countDeleted = 0 toDelete.foreach { attempt => - logInfo(s"Deleting expired event log for ${attempt.logPath}") + logInfo(log"Deleting expired event log for ${MDC(PATH, attempt.logPath)}") val logPath = new Path(logDir, attempt.logPath) listing.delete(classOf[LogInfo], logPath.toString()) cleanAppData(app.id, attempt.info.attemptId, logPath.toString()) @@ -1082,7 +1088,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) false } if (deleteFile) { - logInfo(s"Deleting expired driver log for: $logFileStr") + logInfo(log"Deleting expired driver log for: ${MDC(PATH, logFileStr)}") listing.delete(classOf[LogInfo], logFileStr) deleteLog(driverLogFs, f.getPath()) } @@ -1095,7 +1101,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) .reverse() .first(maxTime), Int.MaxValue) { l => l.logType != null && l.logType == LogType.DriverLogs } stale.filterNot(isProcessing).foreach { log => - logInfo(s"Deleting invalid driver log ${log.logPath}") + logInfo(log"Deleting invalid driver log ${MDC(PATH, log.logPath)}") listing.delete(classOf[LogInfo], log.logPath) deleteLog(driverLogFs, new Path(log.logPath)) } @@ -1124,10 +1130,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) try { val eventLogFiles = reader.listEventLogFiles - logInfo(s"Parsing ${reader.rootPath} to re-build UI...") + logInfo(log"Parsing ${MDC(PATH, reader.rootPath)} to re-build UI...") parseAppEventLogs(eventLogFiles, replayBus, !reader.completed) trackingStore.close(false) - logInfo(s"Finished parsing ${reader.rootPath}") + logInfo(log"Finished parsing ${MDC(PATH, reader.rootPath)}") } catch { case e: Exception => Utils.tryLogNonFatalError { @@ -1228,7 +1234,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) return KVUtils.open(path, metadata, conf, live = false) } catch { case e: Exception => - logInfo(s"Failed to open existing store for $appId/${attempt.info.attemptId}.", e) + logInfo(log"Failed to open existing store for" + + log" ${MDC(APP_ID, appId)}/${MDC(LogKeys.APP_ATTEMPT_ID, attempt.info.attemptId)}.", e) dm.release(appId, attempt.info.attemptId, delete = true) } } @@ -1244,11 +1251,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) case e: RuntimeException if e.getMessage != null && e.getMessage.contains("Not enough memory to create hybrid") => // Handle exception from `HistoryServerMemoryManager.lease`. - logInfo(s"Failed to create HybridStore for $appId/${attempt.info.attemptId}." + - s" Using $hybridStoreDiskBackend. " + e.getMessage) + logInfo(log"Failed to create HybridStore for" + + log" ${MDC(APP_ID, appId)}/${MDC(LogKeys.APP_ATTEMPT_ID, attempt.info.attemptId)}." + + log" Using ${MDC(LogKeys.HYBRID_STORE_DISK_BACKEND, hybridStoreDiskBackend)}." + + log" ${MDC(EXCEPTION, e.getMessage)}") case e: Exception => - logInfo(s"Failed to create HybridStore for $appId/${attempt.info.attemptId}." + - s" Using $hybridStoreDiskBackend.", e) + logInfo(log"Failed to create HybridStore for" + + log" ${MDC(APP_ID, appId)}/${MDC(LogKeys.APP_ATTEMPT_ID, attempt.info.attemptId)}." + + log" Using ${MDC(LogKeys.HYBRID_STORE_DISK_BACKEND, hybridStoreDiskBackend)}.", e) } } @@ -1295,13 +1305,15 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // Create a disk-base KVStore and start a background thread to dump data to it var lease: dm.Lease = null try { - logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...") + logInfo(log"Leasing disk manager space for app" + + log" ${MDC(APP_ID, appId)} / ${MDC(LogKeys.APP_ATTEMPT_ID, attempt.info.attemptId)}...") lease = dm.lease(reader.totalSize, reader.compressionCodec.isDefined) val diskStore = KVUtils.open(lease.tmpPath, metadata, conf, live = false) hybridStore.setDiskStore(diskStore) hybridStore.switchToDiskStore(new HybridStore.SwitchToDiskStoreListener { override def onSwitchToDiskStoreSuccess(): Unit = { - logInfo(s"Completely switched to diskStore for app $appId / ${attempt.info.attemptId}.") + logInfo(log"Completely switched to diskStore for app" + + log" ${MDC(APP_ID, appId)} / ${MDC(LogKeys.APP_ATTEMPT_ID, attempt.info.attemptId)}.") diskStore.close() val newStorePath = lease.commit(appId, attempt.info.attemptId) hybridStore.setDiskStore(KVUtils.open(newStorePath, metadata, conf, live = false)) @@ -1338,7 +1350,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath), attempt.lastIndex) val isCompressed = reader.compressionCodec.isDefined - logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...") + logInfo(log"Leasing disk manager space for app" + + log" ${MDC(APP_ID, appId)} / ${MDC(LogKeys.APP_ATTEMPT_ID, attempt.info.attemptId)}...") val lease = dm.lease(reader.totalSize, isCompressed) try { Utils.tryWithResource(KVUtils.open(lease.tmpPath, metadata, conf, live = false)) { store => @@ -1408,7 +1421,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) deleted = fs.delete(log, true) } catch { case _: AccessControlException => - logInfo(s"No permission to delete $log, ignoring.") + logInfo(log"No permission to delete ${MDC(PATH, log)}, ignoring.") case ioe: IOException => logError(log"IOException in cleaning ${MDC(PATH, log)}", ioe) } @@ -1560,7 +1573,7 @@ private[history] class AppListingListener( val allProperties = event.environmentDetails("Spark Properties").toMap attempt.viewAcls = emptyStringToNone(allProperties.get(UI_VIEW_ACLS.key)) - attempt.adminAcls = emptyStringToNone(allProperties.get(ADMIN_ACLS.key)) + attempt.adminAcls = emptyStringToNone(allProperties.get(UI.ADMIN_ACLS.key)) attempt.viewAclsGroups = emptyStringToNone(allProperties.get(UI_VIEW_ACLS_GROUPS.key)) attempt.adminAclsGroups = emptyStringToNone(allProperties.get(ADMIN_ACLS_GROUPS.key)) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala index 56faa75962e60..122ed299242f5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala @@ -27,6 +27,7 @@ import org.apache.commons.io.FileUtils import org.apache.spark.SparkConf import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys._ +import org.apache.spark.internal.config.History import org.apache.spark.internal.config.History._ import org.apache.spark.internal.config.History.HybridStoreDiskBackend.ROCKSDB import org.apache.spark.status.KVUtils @@ -58,7 +59,7 @@ private class HistoryServerDiskManager( throw new IllegalArgumentException(s"Failed to create app directory ($appStoreDir).") } private val extension = - if (conf.get(HYBRID_STORE_DISK_BACKEND) == ROCKSDB.toString) ".rdb" else ".ldb" + if (conf.get(History.HYBRID_STORE_DISK_BACKEND) == ROCKSDB.toString) ".rdb" else ".ldb" private val tmpStoreDir = new File(path, "temp") if (!tmpStoreDir.isDirectory() && !tmpStoreDir.mkdir()) { diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 21b30cf854e68..1b1053a7013e0 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -74,13 +74,14 @@ private[spark] class CoarseGrainedExecutorBackend( override def onStart(): Unit = { if (env.conf.get(DECOMMISSION_ENABLED)) { val signal = env.conf.get(EXECUTOR_DECOMMISSION_SIGNAL) - logInfo(s"Registering SIG$signal handler to trigger decommissioning.") + logInfo(log"Registering SIG${MDC(LogKeys.SIGNAL, signal)}" + + log" handler to trigger decommissioning.") SignalUtils.register(signal, log"Failed to register SIG${MDC(LogKeys.SIGNAL, signal)} " + log"handler - disabling executor decommission feature.")( self.askSync[Boolean](ExecutorDecommissionSigReceived)) } - logInfo("Connecting to driver: " + driverUrl) + logInfo(log"Connecting to driver: ${MDC(LogKeys.URL, driverUrl)}" ) try { val securityManager = new SecurityManager(env.conf) val shuffleClientTransportConf = SparkTransportConf.fromSparkConf( @@ -182,7 +183,7 @@ private[spark] class CoarseGrainedExecutorBackend( exitExecutor(1, "Received LaunchTask command but executor was null") } else { val taskDesc = TaskDescription.decode(data.value) - logInfo("Got assigned task " + taskDesc.taskId) + logInfo(log"Got assigned task ${MDC(LogKeys.TASK_ID, taskDesc.taskId)}") executor.launchTask(this, taskDesc) } @@ -219,7 +220,7 @@ private[spark] class CoarseGrainedExecutorBackend( }.start() case UpdateDelegationTokens(tokenBytes) => - logInfo(s"Received tokens of ${tokenBytes.length} bytes") + logInfo(log"Received tokens of ${MDC(LogKeys.NUM_BYTES, tokenBytes.length)} bytes") SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf) case DecommissionExecutor => @@ -252,7 +253,8 @@ private[spark] class CoarseGrainedExecutorBackend( override def onDisconnected(remoteAddress: RpcAddress): Unit = { if (stopping.get()) { - logInfo(s"Driver from $remoteAddress disconnected during shutdown") + logInfo(log"Driver from ${MDC(LogKeys.RPC_ADDRESS, remoteAddress)}" + + log" disconnected during shutdown") } else if (driver.exists(_.address == remoteAddress)) { exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.", null, notifyDriver = false) @@ -315,8 +317,7 @@ private[spark] class CoarseGrainedExecutorBackend( log"already started decommissioning.") return } - val msg = s"Decommission executor $executorId." - logInfo(msg) + logInfo(log"Decommission executor ${MDC(LogKeys.EXECUTOR_ID, executorId)}.") try { decommissioned = true val migrationEnabled = env.conf.get(STORAGE_DECOMMISSION_ENABLED) && @@ -369,7 +370,8 @@ private[spark] class CoarseGrainedExecutorBackend( exitExecutor(0, ExecutorLossMessage.decommissionFinished, notifyDriver = true) } } else { - logInfo(s"Blocked from shutdown by ${executor.numRunningTasks} running tasks") + logInfo(log"Blocked from shutdown by" + + log" ${MDC(LogKeys.NUM_TASKS, executor.numRunningTasks)} running tasks") } Thread.sleep(sleep_time) } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 68c38fb6179f5..4e5d151468d8c 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -43,6 +43,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{Logging, LogKeys, MDC => LogMDC} import org.apache.spark.internal.LogKeys._ import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.{EXECUTOR_USER_CLASS_PATH_FIRST => EXECUTOR_USER_CLASS_PATH_FIRST_CONFIG} import org.apache.spark.internal.plugin.PluginContainer import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager} import org.apache.spark.metrics.source.JVMCPUSource @@ -81,10 +82,12 @@ private[spark] class Executor( resources: immutable.Map[String, ResourceInformation]) extends Logging { - logInfo(s"Starting executor ID $executorId on host $executorHostname") - logInfo(s"OS info ${System.getProperty("os.name")}, ${System.getProperty("os.version")}, " + - s"${System.getProperty("os.arch")}") - logInfo(s"Java version ${System.getProperty("java.version")}") + logInfo(log"Starting executor ID ${LogMDC(LogKeys.EXECUTOR_ID, executorId)}" + + log" on host ${LogMDC(HOST_NAME, executorHostname)}") + logInfo(log"OS info ${LogMDC(OS_NAME, System.getProperty("os.name"))}," + + log" ${LogMDC(OS_VERSION, System.getProperty("os.version"))}, " + + log"${LogMDC(OS_ARCH, System.getProperty("os.arch"))}") + logInfo(log"Java version ${LogMDC(JAVA_VERSION, System.getProperty("java.version"))}") private val executorShutdown = new AtomicBoolean(false) val stopHookReference = ShutdownHookManager.addShutdownHook( @@ -169,7 +172,7 @@ private[spark] class Executor( } // Whether to load classes in user jars before those in Spark jars - private val userClassPathFirst = conf.get(EXECUTOR_USER_CLASS_PATH_FIRST) + private val userClassPathFirst = conf.get(EXECUTOR_USER_CLASS_PATH_FIRST_CONFIG) // Whether to monitor killed / interrupted tasks private val taskReaperEnabled = conf.get(TASK_REAPER_ENABLED) @@ -219,7 +222,7 @@ private[spark] class Executor( if (sessionBasedRoot.isDirectory && sessionBasedRoot.exists()) { Utils.deleteRecursively(sessionBasedRoot) } - logInfo(s"Session evicted: ${state.sessionUUID}") + logInfo(log"Session evicted: ${LogMDC(SESSION_ID, state.sessionUUID)}") } }) .build[String, IsolatedSessionState] @@ -501,7 +504,8 @@ private[spark] class Executor( @volatile var task: Task[Any] = _ def kill(interruptThread: Boolean, reason: String): Unit = { - logInfo(s"Executor is trying to kill $taskName, reason: $reason") + logInfo(log"Executor is trying to kill ${LogMDC(TASK_NAME, taskName)}," + + log" reason: ${LogMDC(REASON, reason)}") reasonIfKilled = Some(reason) if (task != null) { synchronized { @@ -572,7 +576,7 @@ private[spark] class Executor( } else 0L Thread.currentThread.setContextClassLoader(isolatedSession.replClassLoader) val ser = env.closureSerializer.newInstance() - logInfo(s"Running $taskName") + logInfo(log"Running ${LogMDC(TASK_NAME, taskName)}") execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) var taskStartTimeNs: Long = 0 var taskStartCpu: Long = 0 @@ -656,10 +660,11 @@ private[spark] class Executor( if (releasedLocks.nonEmpty && !threwException) { val errMsg = - s"${releasedLocks.size} block locks were not released by $taskName\n" + - releasedLocks.mkString("[", ", ", "]") + log"${LogMDC(NUM_RELEASED_LOCKS, releasedLocks.size)} block locks" + + log" were not released by ${LogMDC(TASK_NAME, taskName)}\n" + + log" ${LogMDC(RELEASED_LOCKS, releasedLocks.mkString("[", ", ", "]"))})" if (conf.get(STORAGE_EXCEPTION_PIN_LEAK)) { - throw SparkException.internalError(errMsg, category = "EXECUTOR") + throw SparkException.internalError(errMsg.message, category = "EXECUTOR") } else { logInfo(errMsg) } @@ -747,10 +752,12 @@ private[spark] class Executor( blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER) - logInfo(s"Finished $taskName. $resultSize bytes result sent via BlockManager)") + logInfo(log"Finished ${LogMDC(TASK_NAME, taskName)}." + + log" ${LogMDC(NUM_BYTES, resultSize)} bytes result sent via BlockManager)") ser.serialize(new IndirectTaskResult[Any](blockId, resultSize)) } else { - logInfo(s"Finished $taskName. $resultSize bytes result sent to driver") + logInfo(log"Finished ${LogMDC(TASK_NAME, taskName)}." + + log" ${LogMDC(NUM_BYTES, resultSize)} bytes result sent to driver") // toByteBuffer is safe here, guarded by maxDirectResultSize serializedDirectResult.toByteBuffer } @@ -762,7 +769,8 @@ private[spark] class Executor( execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) } catch { case t: TaskKilledException => - logInfo(s"Executor killed $taskName, reason: ${t.reason}") + logInfo(log"Executor killed ${LogMDC(TASK_NAME, taskName)}," + + log" reason: ${LogMDC(REASON, t.reason)}") val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs) // Here and below, put task metric peaks in an immutable.ArraySeq to expose them as an @@ -775,7 +783,8 @@ private[spark] class Executor( case _: InterruptedException | NonFatal(_) if task != null && task.reasonIfKilled.isDefined => val killReason = task.reasonIfKilled.getOrElse("unknown reason") - logInfo(s"Executor interrupted and killed $taskName, reason: $killReason") + logInfo(log"Executor interrupted and killed ${LogMDC(TASK_NAME, taskName)}," + + log" reason: ${LogMDC(REASON, killReason)}") val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs) val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId).toImmutableArraySeq @@ -1078,8 +1087,10 @@ private[spark] class Executor( private def createClassLoader(urls: Array[URL], useStub: Boolean): MutableURLClassLoader = { logInfo( - s"Starting executor with user classpath (userClassPathFirst = $userClassPathFirst): " + - urls.mkString("'", ",", "'") + log"Starting executor with user classpath" + + log" (userClassPathFirst =" + + log" ${LogMDC(LogKeys.EXECUTOR_USER_CLASS_PATH_FIRST, userClassPathFirst)}): " + + log"${LogMDC(URLS, urls.mkString("'", ",", "'"))}" ) if (useStub) { @@ -1123,12 +1134,13 @@ private[spark] class Executor( sessionUUID: String): ClassLoader = { val classUri = sessionClassUri.getOrElse(conf.get("spark.repl.class.uri", null)) val classLoader = if (classUri != null) { - logInfo("Using REPL class URI: " + classUri) + logInfo(log"Using REPL class URI: ${LogMDC(LogKeys.URI, classUri)}") new ExecutorClassLoader(conf, env, classUri, parent, userClassPathFirst) } else { parent } - logInfo(s"Created or updated repl class loader $classLoader for $sessionUUID.") + logInfo(log"Created or updated repl class loader ${LogMDC(CLASS_LOADER, classLoader)}" + + log" for ${LogMDC(SESSION_ID, sessionUUID)}.") classLoader } @@ -1163,14 +1175,16 @@ private[spark] class Executor( // Fetch missing dependencies for ((name, timestamp) <- newFiles if state.currentFiles.getOrElse(name, -1L) < timestamp) { - logInfo(s"Fetching $name with timestamp $timestamp") + logInfo(log"Fetching ${LogMDC(FILE_NAME, name)} with" + + log" timestamp ${LogMDC(TIMESTAMP, timestamp)}") // Fetch file with useCache mode, close cache for local mode. Utils.fetchFile(name, root, conf, hadoopConf, timestamp, useCache = !isLocal) state.currentFiles(name) = timestamp } for ((name, timestamp) <- newArchives if state.currentArchives.getOrElse(name, -1L) < timestamp) { - logInfo(s"Fetching $name with timestamp $timestamp") + logInfo(log"Fetching ${LogMDC(ARCHIVE_NAME, name)} with" + + log" timestamp ${LogMDC(TIMESTAMP, timestamp)}") val sourceURI = new URI(name) val uriToDownload = Utils.getUriBuilder(sourceURI).fragment(null).build() val source = Utils.fetchFile(uriToDownload.toString, Utils.createTempDir(), conf, @@ -1179,7 +1193,9 @@ private[spark] class Executor( root, if (sourceURI.getFragment != null) sourceURI.getFragment else source.getName) logInfo( - s"Unpacking an archive $name from ${source.getAbsolutePath} to ${dest.getAbsolutePath}") + log"Unpacking an archive ${LogMDC(ARCHIVE_NAME, name)}" + + log" from ${LogMDC(SOURCE_PATH, source.getAbsolutePath)}" + + log" to ${LogMDC(DESTINATION_PATH, dest.getAbsolutePath)}") Utils.deleteRecursively(dest) Utils.unpack(source, dest) state.currentArchives(name) = timestamp @@ -1190,7 +1206,8 @@ private[spark] class Executor( .orElse(state.currentJars.get(localName)) .getOrElse(-1L) if (currentTimeStamp < timestamp) { - logInfo(s"Fetching $name with timestamp $timestamp") + logInfo(log"Fetching ${LogMDC(JAR_URL, name)} with" + + log" timestamp ${LogMDC(TIMESTAMP, timestamp)}") // Fetch file with useCache mode, close cache for local mode. Utils.fetchFile(name, root, conf, hadoopConf, timestamp, useCache = !isLocal) @@ -1198,7 +1215,8 @@ private[spark] class Executor( // Add it to our class loader val url = new File(root, localName).toURI.toURL if (!state.urlClassLoader.getURLs().contains(url)) { - logInfo(s"Adding $url to class loader ${state.sessionUUID}") + logInfo(log"Adding ${LogMDC(LogKeys.URL, url)} to" + + log" class loader ${LogMDC(UUID, state.sessionUUID)}") state.urlClassLoader.addURL(url) if (isStubbingEnabledForState(state.sessionUUID)) { renewClassLoader = true diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorLogUrlHandler.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorLogUrlHandler.scala index 0ddeef8e9a82d..2202489509fc4 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorLogUrlHandler.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorLogUrlHandler.scala @@ -21,7 +21,8 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.util.matching.Regex -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys private[spark] class ExecutorLogUrlHandler(logUrlPattern: Option[String]) extends Logging { import ExecutorLogUrlHandler._ @@ -82,8 +83,10 @@ private[spark] class ExecutorLogUrlHandler(logUrlPattern: Option[String]) extend allPatterns: Set[String], allAttributes: Set[String]): Unit = { if (informedForMissingAttributes.compareAndSet(false, true)) { - logInfo(s"Fail to renew executor log urls: $reason. Required: $allPatterns / " + - s"available: $allAttributes. Falling back to show app's original log urls.") + logInfo(log"Fail to renew executor log urls: ${MDC(LogKeys.REASON, reason)}." + + log" Required: ${MDC(LogKeys.REGEX, allPatterns)} / " + + log"available: ${MDC(LogKeys.ATTRIBUTE_MAP, allAttributes)}." + + log" Falling back to show app's original log urls.") } } } diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapRedCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapRedCommitProtocol.scala index af0aa41518766..44f8d7cd63635 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapRedCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapRedCommitProtocol.scala @@ -20,6 +20,9 @@ package org.apache.spark.internal.io import org.apache.hadoop.mapred._ import org.apache.hadoop.mapreduce.{TaskAttemptContext => NewTaskAttemptContext} +import org.apache.spark.internal.LogKeys +import org.apache.spark.internal.MDC + /** * An [[FileCommitProtocol]] implementation backed by an underlying Hadoop OutputCommitter * (from the old mapred API). @@ -32,7 +35,8 @@ class HadoopMapRedCommitProtocol(jobId: String, path: String) override def setupCommitter(context: NewTaskAttemptContext): OutputCommitter = { val config = context.getConfiguration.asInstanceOf[JobConf] val committer = config.getOutputCommitter - logInfo(s"Using output committer class ${committer.getClass.getCanonicalName}") + logInfo(log"Using output committer class" + + log" ${MDC(LogKeys.CLASS_NAME, committer.getClass.getCanonicalName)}") committer } } diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala index 52529ff1d4e96..db961b3c42f4c 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala @@ -33,7 +33,7 @@ import org.apache.hadoop.mapreduce.task.{TaskAttemptContextImpl => NewTaskAttemp import org.apache.spark.{SerializableWritable, SparkConf, SparkException, TaskContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys.{JOB_ID, TASK_ATTEMPT_ID} +import org.apache.spark.internal.LogKeys.{DURATION, JOB_ID, TASK_ATTEMPT_ID} import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.rdd.{HadoopRDD, RDD} import org.apache.spark.util.{SerializableConfiguration, SerializableJobConf, Utils} @@ -98,10 +98,11 @@ object SparkHadoopWriter extends Logging { iterator = iter) }) - logInfo(s"Start to commit write Job ${jobContext.getJobID}.") + logInfo(log"Start to commit write Job ${MDC(JOB_ID, jobContext.getJobID)}.") val (_, duration) = Utils .timeTakenMs { committer.commitJob(jobContext, ret.toImmutableArraySeq) } - logInfo(s"Write Job ${jobContext.getJobID} committed. Elapsed time: $duration ms.") + logInfo(log"Write Job ${MDC(JOB_ID, jobContext.getJobID)} committed." + + log" Elapsed time: ${MDC(DURATION, duration)} ms.") } catch { case cause: Throwable => logError(log"Aborting job ${MDC(JOB_ID, jobContext.getJobID)}.", cause) diff --git a/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala b/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala index 261e016ce9bf0..a0c07bd75f885 100644 --- a/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala +++ b/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala @@ -22,7 +22,7 @@ import scala.util.{Either, Left, Right} import org.apache.spark.{SparkContext, SparkEnv, TaskFailedReason} import org.apache.spark.api.plugin._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.config._ import org.apache.spark.resource.ResourceInformation import org.apache.spark.util.Utils @@ -56,7 +56,7 @@ private class DriverPluginContainer( sc.conf.set(s"${PluginContainer.EXTRA_CONF_PREFIX}$name.$k", v) } } - logInfo(s"Initialized driver component for plugin $name.") + logInfo(log"Initialized driver component for plugin ${MDC(LogKeys.CLASS_NAME, name)}.") Some((p.getClass().getName(), driverPlugin, ctx)) } else { None @@ -83,7 +83,7 @@ private class DriverPluginContainer( plugin.shutdown() } catch { case t: Throwable => - logInfo(s"Exception while shutting down plugin $name.", t) + logInfo(log"Exception while shutting down plugin ${MDC(LogKeys.CLASS_NAME, name)}.", t) } } } @@ -125,7 +125,7 @@ private class ExecutorPluginContainer( executorPlugin.init(ctx, extraConf) ctx.registerMetrics() - logInfo(s"Initialized executor component for plugin $name.") + logInfo(log"Initialized executor component for plugin ${MDC(LogKeys.CLASS_NAME, name)}.") Some(p.getClass().getName() -> executorPlugin) } else { None @@ -144,7 +144,7 @@ private class ExecutorPluginContainer( plugin.shutdown() } catch { case t: Throwable => - logInfo(s"Exception while shutting down plugin $name.", t) + logInfo(log"Exception while shutting down plugin ${MDC(LogKeys.CLASS_NAME, name)}.", t) } } } @@ -155,7 +155,8 @@ private class ExecutorPluginContainer( plugin.onTaskStart() } catch { case t: Throwable => - logInfo(s"Exception while calling onTaskStart on plugin $name.", t) + logInfo(log"Exception while calling onTaskStart on" + + log" plugin ${MDC(LogKeys.CLASS_NAME, name)}.", t) } } } @@ -166,7 +167,8 @@ private class ExecutorPluginContainer( plugin.onTaskSucceeded() } catch { case t: Throwable => - logInfo(s"Exception while calling onTaskSucceeded on plugin $name.", t) + logInfo(log"Exception while calling onTaskSucceeded on" + + log" plugin ${MDC(LogKeys.CLASS_NAME, name)}.", t) } } } @@ -177,7 +179,8 @@ private class ExecutorPluginContainer( plugin.onTaskFailed(failureReason) } catch { case t: Throwable => - logInfo(s"Exception while calling onTaskFailed on plugin $name.", t) + logInfo(log"Exception while calling onTaskFailed on" + + log" plugin ${MDC(LogKeys.CLASS_NAME, name)}.", t) } } } diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceDiscoveryScriptPlugin.scala b/core/src/main/scala/org/apache/spark/resource/ResourceDiscoveryScriptPlugin.scala index d861e91771673..51de7e2b9ac70 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceDiscoveryScriptPlugin.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceDiscoveryScriptPlugin.scala @@ -23,7 +23,8 @@ import java.util.Optional import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.resource.ResourceDiscoveryPlugin -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys import org.apache.spark.util.Utils.executeAndGetOutput /** @@ -44,7 +45,8 @@ class ResourceDiscoveryScriptPlugin extends ResourceDiscoveryPlugin with Logging val resourceName = request.id.resourceName val result = if (script.isPresent) { val scriptFile = new File(script.get) - logInfo(s"Discovering resources for $resourceName with script: $scriptFile") + logInfo(log"Discovering resources for ${MDC(LogKeys.RESOURCE_NAME, resourceName)}" + + log" with script: ${MDC(LogKeys.PATH, scriptFile)}") // check that script exists and try to execute if (scriptFile.exists()) { val output = executeAndGetOutput(Seq(script.get), new File(".")) diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala index adc1eeeb31276..7dcde35de2518 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala @@ -232,7 +232,7 @@ class ResourceProfile( } val limiting = if (taskLimit == -1) "cpu" else s"$limitingResource at $taskLimit tasks per executor" - logInfo(s"Limiting resource is $limiting") + logInfo(log"Limiting resource is ${MDC(RESOURCE, limiting)}") _executorResourceSlotsPerAddr = Some(numPartsPerResourceMap.toMap) _maxTasksPerExecutor = if (taskLimit == -1) Some(1) else Some(taskLimit) _limitingResource = Some(limitingResource) @@ -374,9 +374,9 @@ object ResourceProfile extends Logging { val defProf = new ResourceProfile(executorResources, taskResources) defProf.setToDefaultProfile() defaultProfile = Some(defProf) - logInfo("Default ResourceProfile created, executor resources: " + - s"${defProf.executorResources}, task resources: " + - s"${defProf.taskResources}") + logInfo(log"Default ResourceProfile created, executor resources: " + + log"${MDC(EXECUTOR_RESOURCES, defProf.executorResources)}, task resources: " + + log"${MDC(TASK_RESOURCES, defProf.taskResources)}") defProf } } diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala index 580a5b7bb07ac..6a6b5067f70f2 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala @@ -23,7 +23,8 @@ import scala.collection.mutable.HashMap import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.annotation.Evolving -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKeys import org.apache.spark.internal.config.Tests._ import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerResourceProfileAdded} import org.apache.spark.util.Utils @@ -140,7 +141,7 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf, if (putNewProfile) { // force the computation of maxTasks and limitingResource now so we don't have cost later rp.limitingResource(sparkConf) - logInfo(s"Added ResourceProfile id: ${rp.id}") + logInfo(log"Added ResourceProfile id: ${MDC(LogKeys.RESOURCE_PROFILE_ID, rp.id)}") listenerBus.post(SparkListenerResourceProfileAdded(rp)) } } diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 24f4ff1bd6728..5c93bf4bf77a0 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -26,7 +26,7 @@ import scala.jdk.CollectionConverters._ import org.apache.spark._ import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.config.CPUS_PER_TASK import org.apache.spark.internal.config.Status._ import org.apache.spark.resource.ResourceProfile.CPUS @@ -662,7 +662,7 @@ private[spark] class AppStatusListener( case e: TaskFailedReason => // All other failure cases Some(e.toErrorString) case other => - logInfo(s"Unhandled task end reason: $other") + logInfo(log"Unhandled task end reason: ${MDC(LogKeys.REASON, other)}") None } task.errorMessage = errorMessage diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 29e21e7f5ffed..87f876467c30e 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -861,7 +861,7 @@ private[spark] object AppStatusStore extends Logging { def createStorePath(rootDir: String): Option[File] = { try { val localDir = Utils.createDirectory(rootDir, "spark-ui") - logInfo(s"Created spark ui store directory at $rootDir") + logInfo(log"Created spark ui store directory at ${MDC(PATH, rootDir)}") Some(localDir) } catch { case e: IOException => diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 9a669021be754..0ac1405abe6c3 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -564,10 +564,8 @@ private[spark] object Utils // Do nothing if the file contents are the same, i.e. this file has been copied // previously. logInfo( - "%s has been previously copied to %s".format( - sourceFile.getAbsolutePath, - destFile.getAbsolutePath - ) + log"${MDC(SOURCE_PATH, sourceFile.getAbsolutePath)} has been previously" + + log" copied to ${MDC(DESTINATION_PATH, destFile.getAbsolutePath)}" ) return } @@ -577,7 +575,8 @@ private[spark] object Utils if (removeSourceFile) { Files.move(sourceFile.toPath, destFile.toPath) } else { - logInfo(s"Copying ${sourceFile.getAbsolutePath} to ${destFile.getAbsolutePath}") + logInfo(log"Copying ${MDC(SOURCE_PATH, sourceFile.getAbsolutePath)}" + + log" to ${MDC(DESTINATION_PATH, destFile.getAbsolutePath)}") copyRecursive(sourceFile, destFile) } } @@ -1203,7 +1202,7 @@ private[spark] object Utils val process = builder.start() if (redirectStderr) { val threadName = "redirect stderr for command " + command(0) - def log(s: String): Unit = logInfo(s) + def log(s: String): Unit = logInfo(log"${MDC(LINE, s)}") processStreamByLine(threadName, process.getErrorStream, log) } process @@ -2166,7 +2165,8 @@ private[spark] object Utils } try { val (service, port) = startService(tryPort) - logInfo(s"Successfully started service$serviceString on port $port.") + logInfo(log"Successfully started service${MDC(SERVICE_NAME, serviceString)}" + + log" on port ${MDC(PORT, port)}.") return (service, port) } catch { case e: Exception if isBindCollision(e) => @@ -2541,9 +2541,10 @@ private[spark] object Utils conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS), conf.get(EXECUTOR_INSTANCES).getOrElse(0)).max - logInfo(s"Using initial executors = $initialExecutors, max of " + - s"${DYN_ALLOCATION_INITIAL_EXECUTORS.key}, ${DYN_ALLOCATION_MIN_EXECUTORS.key} and " + - s"${EXECUTOR_INSTANCES.key}") + logInfo(log"Using initial executors = ${MDC(NUM_EXECUTORS, initialExecutors)}, max of " + + log"${MDC(CONFIG, DYN_ALLOCATION_INITIAL_EXECUTORS.key)}," + + log"${MDC(CONFIG2, DYN_ALLOCATION_MIN_EXECUTORS.key)} and" + + log" ${MDC(CONFIG3, EXECUTOR_INSTANCES.key)}") initialExecutors } @@ -2731,7 +2732,7 @@ private[spark] object Utils e.getCause() match { case uoe: UnsupportedOperationException => logDebug(s"Extension $name not being initialized.", uoe) - logInfo(s"Extension $name not being initialized.") + logInfo(log"Extension ${MDC(CLASS_NAME, name)} not being initialized.") None case null => throw e @@ -2755,8 +2756,8 @@ private[spark] object Utils // To handle master URLs, e.g., k8s://host:port. if (!masterWithoutK8sPrefix.contains("://")) { val resolvedURL = s"https://$masterWithoutK8sPrefix" - logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved " + - s"URL is $resolvedURL.") + logInfo(log"No scheme specified for kubernetes master URL, so defaulting to https." + + log" Resolved URL is ${MDC(LogKeys.URL, resolvedURL)}.") return s"k8s://$resolvedURL" } @@ -3009,7 +3010,7 @@ private[spark] object Utils entry = in.getNextEntry() } in.close() // so that any error in closing does not get ignored - logInfo(s"Unzipped from $dfsZipFile\n\t${files.mkString("\n\t")}") + logInfo(log"Unzipped from ${MDC(PATH, dfsZipFile)}\n\t${MDC(PATHS, files.mkString("\n\t"))}") } finally { // Close everything no matter what happened IOUtils.closeQuietly(in) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 572e69885325c..16a2f4fb6cad9 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -566,8 +566,9 @@ class ExternalAppendOnlyMap[K, V, C]( if (hasSpilled) { false } else { - logInfo(s"Task ${context.taskAttemptId()} force spilling in-memory map to disk and " + - s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") + logInfo(log"Task ${MDC(TASK_ATTEMPT_ID, context.taskAttemptId())} force spilling" + + log" in-memory map to disk and it will release " + + log"${MDC(NUM_BYTES, org.apache.spark.util.Utils.bytesToString(getUsed()))} memory") val nextUpstream = spillMemoryIteratorToDisk(upstream) assert(!upstream.hasNext) hasSpilled = true