Skip to content

[SPARK-13478][YARN] Use real user when fetching delegation tokens. #16665

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
"either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.")
}
}

if (proxyUser != null && principal != null) {
SparkSubmit.printErrorAndExit("Only one of --proxy-user or --principal can be provided.")
}
}

private def validateKillArguments(): Unit = {
Expand Down Expand Up @@ -514,6 +518,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
|
| --proxy-user NAME User to impersonate when submitting the application.
| This argument does not work with --principal / --keytab.
|
| --help, -h Show this help message and exit
| --verbose, -v Print additional debug output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.spark.deploy.yarn

import java.io.File
import java.lang.reflect.UndeclaredThrowableException
import java.nio.charset.StandardCharsets.UTF_8
import java.security.PrivilegedExceptionAction
import java.util.regex.Matcher
import java.util.regex.Pattern

Expand Down Expand Up @@ -156,7 +158,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
*/
def obtainTokenForHiveMetastore(conf: Configuration): Option[Token[DelegationTokenIdentifier]] = {
try {
obtainTokenForHiveMetastoreInner(conf, UserGroupInformation.getCurrentUser().getUserName)
obtainTokenForHiveMetastoreInner(conf)
} catch {
case e: ClassNotFoundException =>
logInfo(s"Hive class not found $e")
Expand All @@ -171,8 +173,8 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
* @param username the username of the principal requesting the delegating token.
* @return a delegation token
*/
private[yarn] def obtainTokenForHiveMetastoreInner(conf: Configuration,
username: String): Option[Token[DelegationTokenIdentifier]] = {
private[yarn] def obtainTokenForHiveMetastoreInner(conf: Configuration):
Option[Token[DelegationTokenIdentifier]] = {
val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)

// the hive configuration class is a subclass of Hadoop Configuration, so can be cast down
Expand All @@ -187,11 +189,12 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {

// Check for local metastore
if (metastoreUri.nonEmpty) {
require(username.nonEmpty, "Username undefined")
val principalKey = "hive.metastore.kerberos.principal"
val principal = hiveConf.getTrimmed(principalKey, "")
require(principal.nonEmpty, "Hive principal $principalKey undefined")
logDebug(s"Getting Hive delegation token for $username against $principal at $metastoreUri")
val currentUser = UserGroupInformation.getCurrentUser()
logDebug(s"Getting Hive delegation token for ${currentUser.getUserName()} against " +
s"$principal at $metastoreUri")
val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
val closeCurrent = hiveClass.getMethod("closeCurrent")
try {
Expand All @@ -200,12 +203,14 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
classOf[String], classOf[String])
val getHive = hiveClass.getMethod("get", hiveConfClass)

// invoke
val hive = getHive.invoke(null, hiveConf)
val tokenStr = getDelegationToken.invoke(hive, username, principal).asInstanceOf[String]
val hive2Token = new Token[DelegationTokenIdentifier]()
hive2Token.decodeFromUrlString(tokenStr)
Some(hive2Token)
doAsRealUser {
val hive = getHive.invoke(null, hiveConf)
val tokenStr = getDelegationToken.invoke(hive, currentUser.getUserName(), principal)
.asInstanceOf[String]
val hive2Token = new Token[DelegationTokenIdentifier]()
hive2Token.decodeFromUrlString(tokenStr)
Some(hive2Token)
}
} finally {
Utils.tryLogNonFatalError {
closeCurrent.invoke(null)
Expand All @@ -216,6 +221,26 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
None
}
}

/**
* Run some code as the real logged in user (which may differ from the current user, for
* example, when using proxying).
*/
private def doAsRealUser[T](fn: => T): T = {
val currentUser = UserGroupInformation.getCurrentUser()
val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser)

// For some reason the Scala-generated anonymous class ends up causing an
// UndeclaredThrowableException, even if you annotate the method with @throws.
try {
realUser.doAs(new PrivilegedExceptionAction[T]() {
override def run(): T = fn
})
} catch {
case e: UndeclaredThrowableException => throw Option(e.getCause()).getOrElse(e)
}
}

}

object YarnSparkHadoopUtil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
hadoopConf.set("hive.metastore.uris", "http://localhost:0")
val util = new YarnSparkHadoopUtil
assertNestedHiveException(intercept[InvocationTargetException] {
util.obtainTokenForHiveMetastoreInner(hadoopConf, "alice")
util.obtainTokenForHiveMetastoreInner(hadoopConf)
})
// expect exception trapping code to unwind this hive-side exception
assertNestedHiveException(intercept[InvocationTargetException] {
Expand Down