Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,11 @@
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.razorvine</groupId>
<artifactId>pyrolite</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,21 @@ private[spark] class HadoopDelegationTokenManager(
* @param creds Credentials object where to store the delegation tokens.
*/
def obtainDelegationTokens(creds: Credentials): Unit = {
val freshUGI = doLogin()
freshUGI.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
val (newTokens, _) = obtainDelegationTokens()
creds.addAll(newTokens)
}
})
val currentUser = UserGroupInformation.getCurrentUser()
val hasKerberosCreds = principal != null ||
Option(currentUser.getRealUser()).getOrElse(currentUser).hasKerberosCredentials()

// Delegation tokens can only be obtained if the real user has Kerberos credentials, so
// skip creation when those are not available.
if (hasKerberosCreds) {
val freshUGI = doLogin()
freshUGI.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
val (newTokens, _) = obtainDelegationTokens()
creds.addAll(newTokens)
}
})
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,12 +427,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
val ugi = UserGroupInformation.getCurrentUser()
val tokens = if (dtm.renewalEnabled) {
dtm.start()
} else if (ugi.hasKerberosCredentials() || SparkHadoopUtil.get.isProxyUser(ugi)) {
} else {
val creds = ugi.getCredentials()
dtm.obtainDelegationTokens(creds)
SparkHadoopUtil.get.serialize(creds)
} else {
null
if (creds.numberOfTokens() > 0 || creds.numberOfSecretKeys() > 0) {
SparkHadoopUtil.get.serialize(creds)
} else {
null
}
}
if (tokens != null) {
updateDelegationTokens(tokens)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@

package org.apache.spark.deploy.security

import java.security.PrivilegedExceptionAction

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION
import org.apache.hadoop.minikdc.MiniKdc
import org.apache.hadoop.security.{Credentials, UserGroupInformation}

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.security.HadoopDelegationTokenProvider
import org.apache.spark.util.Utils

private class ExceptionThrowingDelegationTokenProvider extends HadoopDelegationTokenProvider {
ExceptionThrowingDelegationTokenProvider.constructed = true
Expand Down Expand Up @@ -69,4 +75,48 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite {
assert(!manager.isProviderLoaded("hadoopfs"))
assert(manager.isProviderLoaded("hbase"))
}

test("SPARK-29082: do not fail if current user does not have credentials") {
// SparkHadoopUtil overrides the UGI configuration during initialization. That normally
// happens early in the Spark application, but here it may affect the test depending on
// how it's run, so force its initialization.
SparkHadoopUtil.get

var kdc: MiniKdc = null
try {
// UserGroupInformation.setConfiguration needs default kerberos realm which can be set in
// krb5.conf. MiniKdc sets "java.security.krb5.conf" in start and removes it when stop called.
val kdcDir = Utils.createTempDir()
val kdcConf = MiniKdc.createConf()
kdc = new MiniKdc(kdcConf, kdcDir)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change1: I've added MiniKdc here to set krb5.conf.

kdc.start()

val krbConf = new Configuration()
krbConf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos")

UserGroupInformation.setConfiguration(krbConf)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change2: I've pulled UserGroupInformation.setConfiguration inside the try block.

val manager = new HadoopDelegationTokenManager(new SparkConf(false), krbConf, null)
val testImpl = new PrivilegedExceptionAction[Unit] {
override def run(): Unit = {
assert(UserGroupInformation.isSecurityEnabled())
val creds = new Credentials()
manager.obtainDelegationTokens(creds)
assert(creds.numberOfTokens() === 0)
assert(creds.numberOfSecretKeys() === 0)
}
}

val realUser = UserGroupInformation.createUserForTesting("realUser", Array.empty)
realUser.doAs(testImpl)

val proxyUser = UserGroupInformation.createProxyUserForTesting("proxyUser", realUser,
Array.empty)
proxyUser.doAs(testImpl)
} finally {
if (kdc != null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change3: Stop MiniKdc.

kdc.stop()
}
UserGroupInformation.reset()
}
}
}