Skip to content

Running diff for Token Renewal #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: SPARK-23781
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ private[spark] class HadoopDelegationTokenManager(
"spark.yarn.security.credentials.%s.enabled")
private val providerEnabledConfig = "spark.security.credentials.%s.enabled"

private val principal = sparkConf.get(PRINCIPAL).orNull
private val keytab = sparkConf.get(KEYTAB).orNull
protected val principal = sparkConf.get(PRINCIPAL).orNull
protected val keytab = sparkConf.get(KEYTAB).orNull

require((principal == null) == (keytab == null),
"Both principal and keytab must be defined, or neither.")
Expand All @@ -81,8 +81,8 @@ private[spark] class HadoopDelegationTokenManager(
logDebug("Using the following builtin delegation token providers: " +
s"${delegationTokenProviders.keys.mkString(", ")}.")

private var renewalExecutor: ScheduledExecutorService = _
private val driverRef = new AtomicReference[RpcEndpointRef]()
protected var renewalExecutor: ScheduledExecutorService = _
protected val driverRef = new AtomicReference[RpcEndpointRef]()

protected def setDriverRef(ref: RpcEndpointRef): Unit = {
driverRef.set(ref)
Expand Down Expand Up @@ -262,7 +262,7 @@ private[spark] class HadoopDelegationTokenManager(
*
* @return Credentials containing the new tokens.
*/
private def obtainTokensAndScheduleRenewal(ugi: UserGroupInformation): Credentials = {
protected def obtainTokensAndScheduleRenewal(ugi: UserGroupInformation): Credentials = {
ugi.doAs(new PrivilegedExceptionAction[Credentials]() {
override def run(): Credentials = {
val creds = new Credentials()
Expand All @@ -279,7 +279,7 @@ private[spark] class HadoopDelegationTokenManager(
})
}

private def doLogin(): UserGroupInformation = {
protected def doLogin(): UserGroupInformation = {
logInfo(s"Attempting to login to KDC using principal: $principal")
val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
logInfo("Successfully logged into KDC.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* fully constructed), only if security is enabled in the Hadoop configuration.
*/
protected def createTokenManager(): Option[HadoopDelegationTokenManager] = None

}

private[spark] object CoarseGrainedSchedulerBackend {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,14 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional

val KUBERNETES_KERBEROS_DT_SECRET_RENEWAL =
ConfigBuilder("spark.kubernetes.kerberos.tokenSecret.renewal")
.doc("Enabling the driver to watch the secret specified at " +
"spark.kubernetes.kerberos.tokenSecret.name for updates so that the " +
"tokens can be propogated to the executors.")
.booleanConf
.createWithDefault(false)

val APP_RESOURCE_TYPE =
ConfigBuilder("spark.kubernetes.resource.type")
.doc("This sets the resource type internally")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ private[spark] object Constants {
val KERBEROS_SPARK_USER_NAME =
"spark.kubernetes.kerberos.spark-user-name"
val KERBEROS_SECRET_KEY = "hadoop-tokens"
val SECRET_DATA_ITEM_PREFIX_TOKENS = "spark.kubernetes.dt-"

// Hadoop credentials secrets for the Spark app.
val SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR = "/mnt/secrets/hadoop-credentials"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
def krbConfigMapName: String = s"$appResourceNamePrefix-krb5-file"

def tokenManager(conf: SparkConf, hConf: Configuration): KubernetesHadoopDelegationTokenManager =
new KubernetesHadoopDelegationTokenManager(conf, hConf)
new KubernetesHadoopDelegationTokenManager(conf, hConf, None)

def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,110 @@

package org.apache.spark.deploy.k8s.security

import java.io.{ByteArrayInputStream, DataInputStream}
import java.io.File

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.Secret
import io.fabric8.kubernetes.client.{KubernetesClientException, Watch, Watcher}
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.Watcher.Action
import org.apache.commons.codec.binary.Base64
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.{Credentials, UserGroupInformation}

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants.SECRET_DATA_ITEM_PREFIX_TOKENS
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens

/**
* Adds Kubernetes-specific functionality to HadoopDelegationTokenManager.
*/
private[spark] class KubernetesHadoopDelegationTokenManager(
_sparkConf: SparkConf,
_hadoopConf: Configuration)
_hadoopConf: Configuration,
kubernetesClient: Option[KubernetesClient])
extends HadoopDelegationTokenManager(_sparkConf, _hadoopConf) {

def getCurrentUser: UserGroupInformation = UserGroupInformation.getCurrentUser
def isSecurityEnabled: Boolean = UserGroupInformation.isSecurityEnabled

if (principal != null) {
require(keytab != null, "Kerberos principal specified without a keytab.")
require(new File(keytab).isFile, s"Cannot find keytab at $keytab.")
}
private val isTokenRenewalEnabled =
_sparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_RENEWAL)

private val dtSecretName = _sparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_NAME)
if (isTokenRenewalEnabled) {
require(dtSecretName.isDefined,
"Must specify the token secret which the driver must watch for updates")
}

private def deserialize(credentials: Credentials, data: Array[Byte]): Unit = {
val byteStream = new ByteArrayInputStream(data)
val dataStream = new DataInputStream(byteStream)
credentials.readTokenStorageStream(dataStream)
}

private var watch: Watch = _

/**
* As in HadoopDelegationTokenManager this starts the token renewer.
* Upon start, if a principal has been configured, the renewer will:
*
* - log in the configured principal, and set up a task to keep that user's ticket renewed
* - obtain delegation tokens from all available providers
* - schedule a periodic task to update the tokens when needed.
*
* In the case that the principal is NOT configured, one may still service a long running
* app by enabling the KERBEROS_SECRET_RENEWER config and relying on an external service
* to populate a secret with valid Delegation Tokens that the application will then use.
* This is possibly via the use of a Secret watcher which the driver will leverage to
* detect updates that happen to the secret so that it may retrieve that secret's contents
* and send it to all expiring executors
*
* @param driver If provided, the driver where to send the newly generated tokens.
* The same ref will also receive future token updates unless overridden later.
* @return The newly logged in user, or null
*/
override def start(driverEndpoint: Option[RpcEndpointRef] = None): UserGroupInformation = {
driverEndpoint.foreach(super.setDriverRef)
val driver = driverRef.get()
if (isTokenRenewalEnabled &&
kubernetesClient.isDefined && driverEndpoint.isDefined && driver != null) {
watch = kubernetesClient.get
.secrets()
.withName(dtSecretName.get)
.watch(new Watcher[Secret] {
override def onClose(cause: KubernetesClientException): Unit =
logInfo("Ending the watch of DT Secret")
override def eventReceived(action: Watcher.Action, resource: Secret): Unit = {
action match {
case Action.ADDED | Action.MODIFIED =>
logInfo("Secret update")
val dataItems = resource.getData.asScala.filterKeys(
_.startsWith(SECRET_DATA_ITEM_PREFIX_TOKENS)).toSeq.sorted
val latestToken = if (dataItems.nonEmpty) Some(dataItems.max) else None
latestToken.foreach {
case (_, data) =>
val credentials = new Credentials
deserialize(credentials, Base64.decodeBase64(data))
val tokens = SparkHadoopUtil.get.serialize(credentials)
driver.send(UpdateDelegationTokens(tokens))
}
}
}
})
null
} else {
super.start(driverEndpoint)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import java.util.concurrent.ExecutorService
import io.fabric8.kubernetes.client.KubernetesClient
import scala.concurrent.{ExecutionContext, Future}

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.security.KubernetesHadoopDelegationTokenManager
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.rpc.{RpcAddress, RpcEnv}
import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils}
Expand Down Expand Up @@ -126,6 +129,12 @@ private[spark] class KubernetesClusterSchedulerBackend(
new KubernetesDriverEndpoint(rpcEnv, properties)
}

override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = {
Some(new KubernetesHadoopDelegationTokenManager(conf,
SparkHadoopUtil.get.newConfiguration(conf),
kubernetesClient))
}

private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends DriverEndpoint(rpcEnv, sparkProperties) {

Expand Down