Skip to content

[SPARK-47573][K8S] Support custom driver log url #45728

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@
* limitations under the License.
*/

package org.apache.spark.executor
package org.apache.spark

import java.util.concurrent.atomic.AtomicBoolean

import scala.util.matching.Regex

import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys
import org.apache.spark.internal.{Logging, LogKeys, MDC}

private[spark] class ExecutorLogUrlHandler(logUrlPattern: Option[String]) extends Logging {
import ExecutorLogUrlHandler._

private[spark] class LogUrlHandler(logUrlPattern: Option[String]) extends Logging {
import LogUrlHandler._

private val informedForMissingAttributes = new AtomicBoolean(false)

Expand Down Expand Up @@ -83,14 +83,14 @@ private[spark] class ExecutorLogUrlHandler(logUrlPattern: Option[String]) extend
allPatterns: Set[String],
allAttributes: Set[String]): Unit = {
if (informedForMissingAttributes.compareAndSet(false, true)) {
logInfo(log"Fail to renew executor log urls: ${MDC(LogKeys.REASON, reason)}." +
logInfo(log"Fail to renew log urls: ${MDC(LogKeys.REASON, reason)}." +
log" Required: ${MDC(LogKeys.REGEX, allPatterns)} / " +
log"available: ${MDC(LogKeys.ATTRIBUTE_MAP, allAttributes)}." +
log" Falling back to show app's original log urls.")
}
}
}

private[spark] object ExecutorLogUrlHandler {
private[spark] object LogUrlHandler {
val CUSTOM_URL_PATTERN_REGEX: Regex = "\\{\\{([A-Za-z0-9_\\-]+)\\}\\}".r
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package org.apache.spark.deploy.history

import org.apache.spark.SparkConf
import org.apache.spark.executor.ExecutorLogUrlHandler
import org.apache.spark.{LogUrlHandler, SparkConf}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.History._
import org.apache.spark.status.AppStatusStore
Expand All @@ -40,7 +39,7 @@ private[spark] class HistoryAppStatusStore(
}
}

private val logUrlHandler = new ExecutorLogUrlHandler(logUrlPattern)
private val logUrlHandler = new LogUrlHandler(logUrlPattern)

override def executorList(activeOnly: Boolean): Seq[v1.ExecutorSummary] = {
val execList = super.executorList(activeOnly)
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/UI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,18 @@ private[spark] object UI {
.stringConf
.createOptional

val CUSTOM_DRIVER_LOG_URL = ConfigBuilder("spark.ui.custom.driver.log.url")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this doesn't have spark.kubernetes. prefix, this is supposed to work with YARN, @EnricoMi . Are you sure?

Copy link
Contributor Author

@EnricoMi EnricoMi Sep 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just derived from spark.ui.custom.executor.log.url, where I suspect this should be similar. The property name does not mention Yarn, and documentation says that only K8s support this.

.doc("Specifies custom Spark driver log url for supporting external log service instead of " +
"using cluster managers' application log urls in the Spark UI. Spark will support " +
"some path variables via patterns which can vary on cluster manager. Please check the " +
"documentation for your cluster manager to see which patterns are supported, if any. " +
"This configuration replaces original log urls in event log, which will be also effective " +
"when accessing the application on history server. The new log urls must be permanent, " +
"otherwise you might have dead link for executor log urls.")
.version("4.1.0")
.stringConf
.createOptional

val CUSTOM_EXECUTOR_LOG_URL = ConfigBuilder("spark.ui.custom.executor.log.url")
.doc("Specifies custom spark executor log url for supporting external log service instead of " +
"using cluster managers' application log urls in the Spark UI. Spark will support " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@ import scala.concurrent.Future
import com.google.common.cache.CacheBuilder
import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.{ExecutorAllocationClient, SparkEnv, TaskState}
import org.apache.spark.{ExecutorAllocationClient, LogUrlHandler, SparkEnv, TaskState}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.executor.ExecutorLogUrlHandler
import org.apache.spark.internal.{config, Logging, MDC}
import org.apache.spark.internal.LogKeys
import org.apache.spark.internal.LogKeys._
Expand Down Expand Up @@ -156,7 +155,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
.filter { case (k, _) => k.startsWith("spark.") }
.toImmutableArraySeq

private val logUrlHandler: ExecutorLogUrlHandler = new ExecutorLogUrlHandler(
private val logUrlHandler: LogUrlHandler = new LogUrlHandler(
conf.get(UI.CUSTOM_EXECUTOR_LOG_URL))

override def onStart(): Unit = {
Expand Down
18 changes: 17 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1675,6 +1675,22 @@ Apart from these, the following properties are also available, and may be useful
</td>
<td>2.1.0</td>
</tr>
<tr>
<td><code>spark.ui.custom.driver.log.url</code></td>
<td>(none)</td>
<td>
Specifies custom Spark driver log URL for supporting external log service instead of using cluster
managers' application log URLs in Spark UI. Spark will support some path variables via patterns
which can vary on cluster manager. Please check the documentation for your cluster manager to
see which patterns are supported, if any. <p/>
Please note that this configuration also replaces original log urls in event log,
which will be also effective when accessing the application on history server. The new log urls must be
permanent, otherwise you might have dead link for driver log urls.
<p/>
For now, only K8s cluster manager supports this configuration.
</td>
<td>4.1.0</td>
</tr>
<tr>
<td><code>spark.ui.custom.executor.log.url</code></td>
<td>(none)</td>
Expand All @@ -1687,7 +1703,7 @@ Apart from these, the following properties are also available, and may be useful
which will be also effective when accessing the application on history server. The new log urls must be
permanent, otherwise you might have dead link for executor log urls.
<p/>
For now, only YARN and K8s cluster manager supports this configuration
For now, only YARN and K8s cluster manager supports this configuration.
</td>
<td>3.0.0</td>
</tr>
Expand Down
10 changes: 7 additions & 3 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -433,14 +433,18 @@ the cluster.
When there exists a log collection system, you can expose it at Spark Driver `Executors` tab UI. For example,

```
spark.ui.custom.executor.log.url='https://log-server/log?appId={{APP_ID}}&execId={{EXECUTOR_ID}}'
spark.ui.custom.driver.log.url='https://log-server/driverLog?appId={{APP_ID}}&namespace={{KUBERNETES_NAMESPACE}}&podName={{KUBERNETES_POD_NAME}}'
spark.ui.custom.executor.log.url='https://log-server/executorLog?appId={{APP_ID}}&execId={{EXECUTOR_ID}}'
```

You can add additional custom variables to this url template, populated with the values of existing executor environment variables like
You can add additional custom variables to these url templates, populated with the values of existing driver and executor environment variables like

```
spark.driverEnv.SPARK_DRIVER_ATTRIBUTE_YOUR_VAR='$(EXISTING_DRIVER_ENV_VAR)'
spark.executorEnv.SPARK_EXECUTOR_ATTRIBUTE_YOUR_VAR='$(EXISTING_EXECUTOR_ENV_VAR)'
spark.ui.custom.executor.log.url='https://log-server/log?appId={{APP_ID}}&execId={{EXECUTOR_ID}}&your_var={{YOUR_VAR}}'

spark.ui.custom.driver.log.url='https://log-server/driverLog?appId={{APP_ID}}&podName={{KUBRNETES_POD_NAME}}&your_var={{YOUR_VAR}}'
spark.ui.custom.executor.log.url='https://log-server/executorLog?appId={{APP_ID}}&execId={{EXECUTOR_ID}}&your_var={{YOUR_VAR}}'
```

### Accessing Driver UI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ object Constants {
val UI_PORT_NAME = "spark-ui"

// Environment Variables
val ENV_DRIVER_ATTRIBUTE_APP_ID = "SPARK_DRIVER_ATTRIBUTE_APP_ID"
val ENV_DRIVER_ATTRIBUTE_KUBERNETES_NAMESPACE = "SPARK_DRIVER_ATTRIBUTE_KUBERNETES_NAMESPACE"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question: we need this K8s namespace attribute additionally only for driver logs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

val ENV_DRIVER_ATTRIBUTE_KUBERNETES_POD_NAME = "SPARK_DRIVER_ATTRIBUTE_KUBERNETES_POD_NAME"
val ENV_DRIVER_POD_IP = "SPARK_DRIVER_POD_IP"
val ENV_DRIVER_URL = "SPARK_DRIVER_URL"
val ENV_EXECUTOR_CORES = "SPARK_EXECUTOR_CORES"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,15 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB

override def configurePod(pod: SparkPod): SparkPod = {
val driverAttributes = conf.get(UI.CUSTOM_DRIVER_LOG_URL).map { _ =>
Map(
ENV_DRIVER_ATTRIBUTE_APP_ID -> conf.appId,
ENV_DRIVER_ATTRIBUTE_KUBERNETES_NAMESPACE -> conf.get(KUBERNETES_NAMESPACE),
ENV_DRIVER_ATTRIBUTE_KUBERNETES_POD_NAME -> driverPodName
)
}.getOrElse(Map.empty[String, String])
val driverCustomEnvs = KubernetesUtils.buildEnvVars(
Seq(ENV_APPLICATION_ID -> conf.appId) ++ conf.environment)
Seq(ENV_APPLICATION_ID -> conf.appId) ++ conf.environment ++ driverAttributes)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this inevitable to inject both ENV_APPLICATION_ID and ENV_DRIVER_ATTRIBUTE_APP_ID? This looks like a redundancy for me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same pattern as in

val attributes = if (kubernetesConf.get(UI.CUSTOM_EXECUTOR_LOG_URL).isDefined) {
Map(
ENV_EXECUTOR_ATTRIBUTE_APP_ID -> kubernetesConf.appId,
ENV_EXECUTOR_ATTRIBUTE_EXECUTOR_ID -> kubernetesConf.executorId)
} else {
Map.empty[String, String]
}
KubernetesUtils.buildEnvVars(
Seq(
ENV_DRIVER_URL -> driverUrl,
ENV_EXECUTOR_CORES -> execResources.cores.get.toString,
ENV_EXECUTOR_MEMORY -> executorMemoryString,
ENV_APPLICATION_ID -> kubernetesConf.appId,
// This is to set the SPARK_CONF_DIR to be /opt/spark/conf
ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL,
ENV_EXECUTOR_ID -> kubernetesConf.executorId,
ENV_RESOURCE_PROFILE_ID -> resourceProfile.id.toString)
++ attributes
++ kubernetesConf.environment
++ sparkAuthSecret
++ Seq(ENV_CLASSPATH -> kubernetesConf.get(EXECUTOR_CLASS_PATH).orNull)
++ allOpts) ++
KubernetesUtils.buildEnvVarsWithFieldRef(
Seq(
(ENV_EXECUTOR_POD_IP, "v1", "status.podIP"),
(ENV_EXECUTOR_POD_NAME, "v1", "metadata.name")
))

val driverCpuQuantity = new Quantity(driverCoresRequest)
val driverMemoryQuantity = new Quantity(s"${driverMemoryWithOverheadMiB}Mi")
val maybeCpuLimitQuantity = driverLimitCores.map { limitCores =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.scheduler.cluster.k8s

import java.util.Locale
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger

Expand All @@ -26,19 +27,18 @@ import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.api.model.PodBuilder
import io.fabric8.kubernetes.client.KubernetesClient

import org.apache.spark.SparkContext
import org.apache.spark.{LogUrlHandler, SparkContext}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.KubernetesClientUtils
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.LogKeys.{COUNT, TOTAL}
import org.apache.spark.internal.MDC
import org.apache.spark.internal.config.SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO
import org.apache.spark.internal.config.{SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO, UI}
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.rpc.{RpcAddress, RpcCallContext}
import org.apache.spark.scheduler.{ExecutorDecommission, ExecutorDecommissionInfo, ExecutorKilled, ExecutorLossReason,
TaskSchedulerImpl}
import org.apache.spark.scheduler.{ExecutorDecommission, ExecutorDecommissionInfo, ExecutorKilled, ExecutorLossReason, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor
import org.apache.spark.util.{ThreadUtils, Utils}
Expand Down Expand Up @@ -104,6 +104,20 @@ private[spark] class KubernetesClusterSchedulerBackend(
conf.getOption("spark.app.id").getOrElse(appId)
}

def extractAttributes: Map[String, String] = {
val prefix = "SPARK_DRIVER_ATTRIBUTE_"
sys.env.filter { case (k, _) => k.startsWith(prefix) }
.map(e => (e._1.substring(prefix.length).toUpperCase(Locale.ROOT), e._2))
}

override def getDriverAttributes: Option[Map[String, String]] =
Some(Map("LOG_FILES" -> "log") ++ extractAttributes)

override def getDriverLogUrls: Option[Map[String, String]] = {
val logUrlHandler = new LogUrlHandler(conf.get(UI.CUSTOM_DRIVER_LOG_URL))
getDriverAttributes.map(attr => logUrlHandler.applyPattern(Map.empty, attr)).filter(_.nonEmpty)
}

override def start(): Unit = {
super.start()
// Must be called before setting the executors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,35 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
assert(amountAndFormat(limits("memory")) === "5500Mi")
}

test("SPARK-47573: Add some SPARK_DRIVER_ATTRIBUTE_* if CUSTOM_DRIVER_LOG_URL" +
" is defined") {
val sparkConf = new SparkConf()
.set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
.set(CONTAINER_IMAGE, "spark-driver:latest")
.set(CUSTOM_DRIVER_LOG_URL, "url-pattern")
val kubernetesConf: KubernetesDriverConf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf,
labels = CUSTOM_DRIVER_LABELS,
environment = DRIVER_ENVS,
annotations = DRIVER_ANNOTATIONS)

val featureStep = new BasicDriverFeatureStep(kubernetesConf)
val basePod = SparkPod.initialPod()
val configuredPod = featureStep.configurePod(basePod)

val envs = configuredPod.container
.getEnv
.asScala
.map { env => (env.getName, env.getValue) }
.toMap
Map(
ENV_DRIVER_ATTRIBUTE_APP_ID -> "appId",
ENV_DRIVER_ATTRIBUTE_KUBERNETES_NAMESPACE -> "default",
ENV_DRIVER_ATTRIBUTE_KUBERNETES_POD_NAME -> "spark-driver-pod"
).foreach { case (k, v) =>
assert(envs(k) === v)
}
}

def containerPort(name: String, portNumber: Int): ContainerPort =
new ContainerPortBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.internal.config.UI
import org.apache.spark.resource.{ResourceProfile, ResourceProfileManager}
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.{ExecutorKilled, LiveListenerBus, TaskSchedulerImpl}
Expand All @@ -47,6 +48,11 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
.set("spark.app.id", TEST_SPARK_APP_ID)
.set(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL.key, "soLong")
.set(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL_VALUE.key, "cruelWorld")
.set(
UI.CUSTOM_DRIVER_LOG_URL.key,
"https://my-custom.url/api/logs?applicationId={{APP_ID}}&namespace={{KUBERNETES_NAMESPACE}}" +
"&pod_name={{KUBERNETES_POD_NAME}}"
)

@Mock
private var sc: SparkContext = _
Expand Down Expand Up @@ -259,4 +265,55 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
endpoint.receiveAndReply(context).apply(GenerateExecID("cheeseBurger"))
verify(context).reply("1")
}

test("SPARK-47573: Driver attributes") {
assert(schedulerBackendUnderTest.getDriverAttributes === Some(Map(
"LOG_FILES" -> "log"
)))

withEnvs(
ENV_DRIVER_ATTRIBUTE_APP_ID -> "spark-app-id",
ENV_DRIVER_ATTRIBUTE_KUBERNETES_NAMESPACE -> "default",
ENV_DRIVER_ATTRIBUTE_KUBERNETES_POD_NAME -> "pod.name"
) {
assert(schedulerBackendUnderTest.getDriverAttributes === Some(Map(
"LOG_FILES" -> "log",
"APP_ID" -> "spark-app-id",
"KUBERNETES_NAMESPACE" -> "default",
"KUBERNETES_POD_NAME" -> "pod.name"
)))
}
}

test("SPARK-47573: Driver log urls") {
assert(schedulerBackendUnderTest.getDriverLogUrls === None)
withEnvs(
ENV_DRIVER_ATTRIBUTE_APP_ID -> "spark-app-id",
ENV_DRIVER_ATTRIBUTE_KUBERNETES_NAMESPACE -> "default",
ENV_DRIVER_ATTRIBUTE_KUBERNETES_POD_NAME -> "pod.name"
) {
assert(schedulerBackendUnderTest.getDriverLogUrls === Some(Map(
"log" -> ("https://my-custom.url/api/logs?applicationId=spark-app-id&namespace=default" +
"&pod_name=pod.name")
)))
}
}

private def withEnvs(pairs: (String, String)*)(f: => Unit): Unit = {
val readonlyEnv = System.getenv()
val field = readonlyEnv.getClass.getDeclaredField("m")
field.setAccessible(true)
val modifiableEnv = field.get(readonlyEnv).asInstanceOf[java.util.Map[String, String]]
try {
for ((k, v) <- pairs) {
assert(!modifiableEnv.containsKey(k))
modifiableEnv.put(k, v)
}
f
} finally {
for ((k, _) <- pairs) {
modifiableEnv.remove(k)
}
}
}
}