Skip to content

Commit e847ab0

Browse files
author
Marcelo Vanzin
committed
[SPARK-13478][YARN] Use real user when fetching delegation tokens.
The Hive client library is not smart enough to notice that the current user is a proxy user; so when using a proxy user, it fails to fetch delegation tokens from the metastore because of a missing kerberos TGT for the current user. To fix it, just run the code that fetches the delegation token as the real logged in user. Tested on a kerberos cluster both submitting normally and with a proxy user; Hive and HBase tokens are retrieved correctly in both cases. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #11358 from vanzin/SPARK-13478. (cherry picked from commit c7fccb5)
1 parent 2303887 commit e847ab0

File tree

3 files changed

+42
-12
lines changed

3 files changed

+42
-12
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
252252
"either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.")
253253
}
254254
}
255+
256+
if (proxyUser != null && principal != null) {
257+
SparkSubmit.printErrorAndExit("Only one of --proxy-user or --principal can be provided.")
258+
}
255259
}
256260

257261
private def validateKillArguments(): Unit = {
@@ -514,6 +518,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
514518
| --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
515519
|
516520
| --proxy-user NAME User to impersonate when submitting the application.
521+
| This argument does not work with --principal / --keytab.
517522
|
518523
| --help, -h Show this help message and exit
519524
| --verbose, -v Print additional debug output

yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
package org.apache.spark.deploy.yarn
1919

2020
import java.io.File
21+
import java.lang.reflect.UndeclaredThrowableException
2122
import java.nio.charset.StandardCharsets.UTF_8
23+
import java.security.PrivilegedExceptionAction
2224
import java.util.regex.Matcher
2325
import java.util.regex.Pattern
2426

@@ -156,7 +158,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
156158
*/
157159
def obtainTokenForHiveMetastore(conf: Configuration): Option[Token[DelegationTokenIdentifier]] = {
158160
try {
159-
obtainTokenForHiveMetastoreInner(conf, UserGroupInformation.getCurrentUser().getUserName)
161+
obtainTokenForHiveMetastoreInner(conf)
160162
} catch {
161163
case e: ClassNotFoundException =>
162164
logInfo(s"Hive class not found $e")
@@ -171,8 +173,8 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
171173
* @param username the username of the principal requesting the delegating token.
172174
* @return a delegation token
173175
*/
174-
private[yarn] def obtainTokenForHiveMetastoreInner(conf: Configuration,
175-
username: String): Option[Token[DelegationTokenIdentifier]] = {
176+
private[yarn] def obtainTokenForHiveMetastoreInner(conf: Configuration):
177+
Option[Token[DelegationTokenIdentifier]] = {
176178
val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
177179

178180
// the hive configuration class is a subclass of Hadoop Configuration, so can be cast down
@@ -187,11 +189,12 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
187189

188190
// Check for local metastore
189191
if (metastoreUri.nonEmpty) {
190-
require(username.nonEmpty, "Username undefined")
191192
val principalKey = "hive.metastore.kerberos.principal"
192193
val principal = hiveConf.getTrimmed(principalKey, "")
193194
require(principal.nonEmpty, "Hive principal $principalKey undefined")
194-
logDebug(s"Getting Hive delegation token for $username against $principal at $metastoreUri")
195+
val currentUser = UserGroupInformation.getCurrentUser()
196+
logDebug(s"Getting Hive delegation token for ${currentUser.getUserName()} against " +
197+
s"$principal at $metastoreUri")
195198
val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
196199
val closeCurrent = hiveClass.getMethod("closeCurrent")
197200
try {
@@ -200,12 +203,14 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
200203
classOf[String], classOf[String])
201204
val getHive = hiveClass.getMethod("get", hiveConfClass)
202205

203-
// invoke
204-
val hive = getHive.invoke(null, hiveConf)
205-
val tokenStr = getDelegationToken.invoke(hive, username, principal).asInstanceOf[String]
206-
val hive2Token = new Token[DelegationTokenIdentifier]()
207-
hive2Token.decodeFromUrlString(tokenStr)
208-
Some(hive2Token)
206+
doAsRealUser {
207+
val hive = getHive.invoke(null, hiveConf)
208+
val tokenStr = getDelegationToken.invoke(hive, currentUser.getUserName(), principal)
209+
.asInstanceOf[String]
210+
val hive2Token = new Token[DelegationTokenIdentifier]()
211+
hive2Token.decodeFromUrlString(tokenStr)
212+
Some(hive2Token)
213+
}
209214
} finally {
210215
Utils.tryLogNonFatalError {
211216
closeCurrent.invoke(null)
@@ -216,6 +221,26 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
216221
None
217222
}
218223
}
224+
225+
/**
226+
* Run some code as the real logged in user (which may differ from the current user, for
227+
* example, when using proxying).
228+
*/
229+
private def doAsRealUser[T](fn: => T): T = {
230+
val currentUser = UserGroupInformation.getCurrentUser()
231+
val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser)
232+
233+
// For some reason the Scala-generated anonymous class ends up causing an
234+
// UndeclaredThrowableException, even if you annotate the method with @throws.
235+
try {
236+
realUser.doAs(new PrivilegedExceptionAction[T]() {
237+
override def run(): T = fn
238+
})
239+
} catch {
240+
case e: UndeclaredThrowableException => throw Option(e.getCause()).getOrElse(e)
241+
}
242+
}
243+
219244
}
220245

221246
object YarnSparkHadoopUtil {

yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
257257
hadoopConf.set("hive.metastore.uris", "http://localhost:0")
258258
val util = new YarnSparkHadoopUtil
259259
assertNestedHiveException(intercept[InvocationTargetException] {
260-
util.obtainTokenForHiveMetastoreInner(hadoopConf, "alice")
260+
util.obtainTokenForHiveMetastoreInner(hadoopConf)
261261
})
262262
// expect exception trapping code to unwind this hive-side exception
263263
assertNestedHiveException(intercept[InvocationTargetException] {

0 commit comments

Comments
 (0)