Skip to content

Commit f9b4685

Browse files
committed
[SPARK-25501][SS] Add kafka delegation token support.
1 parent f246813 commit f9b4685

File tree

9 files changed

+395
-51
lines changed

9 files changed

+395
-51
lines changed

core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ private[spark] class HadoopDelegationTokenManager(
6666
private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = {
6767
val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++
6868
safeCreateProvider(new HiveDelegationTokenProvider) ++
69-
safeCreateProvider(new HBaseDelegationTokenProvider)
69+
safeCreateProvider(new HBaseDelegationTokenProvider) ++
70+
safeCreateProvider(new KafkaDelegationTokenProvider)
7071

7172
// Filter out providers for which spark.security.credentials.{service}.enabled is false.
7273
providers
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.security
19+
20+
import scala.reflect.runtime.universe
21+
import scala.util.control.NonFatal
22+
23+
import org.apache.hadoop.conf.Configuration
24+
import org.apache.hadoop.security.Credentials
25+
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
26+
27+
import org.apache.spark.SparkConf
28+
import org.apache.spark.internal.Logging
29+
import org.apache.spark.internal.config.{KAFKA_DELEGATION_TOKEN_ENABLED, KAFKA_SECURITY_PROTOCOL}
30+
import org.apache.spark.util.Utils
31+
32+
private[security] class KafkaDelegationTokenProvider
33+
extends HadoopDelegationTokenProvider with Logging {
34+
35+
override def serviceName: String = "kafka"
36+
37+
override def obtainDelegationTokens(
38+
hadoopConf: Configuration,
39+
sparkConf: SparkConf,
40+
creds: Credentials): Option[Long] = {
41+
try {
42+
val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
43+
val obtainToken = mirror.classLoader.
44+
loadClass("org.apache.spark.sql.kafka010.TokenUtil").
45+
getMethod("obtainToken", classOf[SparkConf])
46+
47+
logDebug("Attempting to fetch Kafka security token.")
48+
val token = obtainToken.invoke(null, sparkConf)
49+
.asInstanceOf[Token[_ <: TokenIdentifier]]
50+
logInfo(s"Get token from Kafka: ${token.toString}")
51+
creds.addToken(token.getService, token)
52+
} catch {
53+
case NonFatal(e) =>
54+
logDebug(s"Failed to get token from service $serviceName", e)
55+
}
56+
57+
None
58+
}
59+
60+
override def delegationTokensRequired(
61+
sparkConf: SparkConf,
62+
hadoopConf: Configuration): Boolean = {
63+
sparkConf.get(KAFKA_DELEGATION_TOKEN_ENABLED) &&
64+
sparkConf.get(KAFKA_SECURITY_PROTOCOL).startsWith("SASL")
65+
}
66+
}

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,4 +647,42 @@ package object config {
647647
.stringConf
648648
.toSequence
649649
.createWithDefault(Nil)
650+
651+
private[spark] val KAFKA_DELEGATION_TOKEN_ENABLED =
652+
ConfigBuilder("spark.kafka.delegation.token.enabled")
653+
.doc("Set to 'true' for obtaining delegation token from kafka.")
654+
.booleanConf
655+
.createWithDefault(false)
656+
657+
private[spark] val KAFKA_BOOTSTRAP_SERVERS =
658+
ConfigBuilder("spark.kafka.bootstrap.servers")
659+
.doc("A list of coma separated host/port pairs to use for establishing the initial " +
660+
"connection to the Kafka cluster. For further details please see kafka documentation.")
661+
.stringConf.createOptional
662+
663+
private[spark] val KAFKA_SECURITY_PROTOCOL =
664+
ConfigBuilder("spark.kafka.security.protocol")
665+
.doc("Protocol used to communicate with brokers. For further details please see kafka " +
666+
"documentation.")
667+
.stringConf.createWithDefault("SASL_SSL")
668+
669+
private[spark] val KAFKA_KERBEROS_SERVICE_NAME =
670+
ConfigBuilder("spark.kafka.sasl.kerberos.service.name")
671+
.doc("The Kerberos principal name that Kafka runs as. This can be defined either in " +
672+
"Kafka's JAAS config or in Kafka's config. For further details please see kafka " +
673+
"documentation.")
674+
.stringConf.createOptional
675+
676+
private[spark] val KAFKA_TRUSTSTORE_LOCATION =
677+
ConfigBuilder("spark.kafka.ssl.truststore.location")
678+
.doc("The location of the trust store file. For further details please see kafka " +
679+
"documentation.")
680+
.stringConf.createOptional
681+
682+
private[spark] val KAFKA_TRUSTSTORE_PASSWORD =
683+
ConfigBuilder("spark.kafka.ssl.truststore.password")
684+
.doc("The store password for the key store file. This is optional for client and only " +
685+
"needed if ssl.keystore.location is configured. For further details please see kafka " +
686+
"documentation.")
687+
.stringConf.createOptional
650688
}

core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.hadoop.security.Credentials
2424
import org.scalatest.Matchers
2525

2626
import org.apache.spark.{SparkConf, SparkFunSuite}
27+
import org.apache.spark.internal.config._
2728

2829
class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers {
2930
private var delegationTokenManager: HadoopDelegationTokenManager = null
@@ -46,6 +47,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers {
4647
delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") should not be (None)
4748
delegationTokenManager.getServiceDelegationTokenProvider("hbase") should not be (None)
4849
delegationTokenManager.getServiceDelegationTokenProvider("hive") should not be (None)
50+
delegationTokenManager.getServiceDelegationTokenProvider("kafka") should not be (None)
4951
delegationTokenManager.getServiceDelegationTokenProvider("bogus") should be (None)
5052
}
5153

@@ -81,7 +83,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers {
8183
hadoopFSsToAccess)
8284
val creds = new Credentials()
8385

84-
// Tokens cannot be obtained from HDFS, Hive, HBase in unit tests.
86+
// Tokens cannot be obtained from HDFS, Hive, HBase, Kafka in unit tests.
8587
delegationTokenManager.obtainDelegationTokens(hadoopConf, creds)
8688
val tokens = creds.getAllTokens
8789
tokens.size() should be (0)
@@ -111,6 +113,17 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers {
111113
creds.getAllTokens.size should be (0)
112114
}
113115

116+
test("Obtain tokens For Kafka") {
117+
val hadoopConf = new Configuration()
118+
sparkConf.set(KAFKA_DELEGATION_TOKEN_ENABLED, true)
119+
120+
val kafkaTokenProvider = new KafkaDelegationTokenProvider()
121+
val creds = new Credentials()
122+
kafkaTokenProvider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
123+
124+
creds.getAllTokens.size should be (0)
125+
}
126+
114127
test("SPARK-23209: obtain tokens when Hive classes are not available") {
115128
// This test needs a custom class loader to hide Hive classes which are in the classpath.
116129
// Because the manager code loads the Hive provider directly instead of using reflection, we
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.kafka010
19+
20+
import org.apache.hadoop.security.UserGroupInformation
21+
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
22+
23+
import org.apache.spark.SparkConf
24+
import org.apache.spark.internal.Logging
25+
import org.apache.spark.internal.config._
26+
27+
private[kafka010] object KafkaSecurityHelper extends Logging {
28+
def getKeytabJaasParams(sparkConf: SparkConf): Option[String] = {
29+
if (sparkConf.get(KEYTAB).nonEmpty) {
30+
Some(getKrbJaasParams(sparkConf))
31+
} else {
32+
None
33+
}
34+
}
35+
36+
def getKrbJaasParams(sparkConf: SparkConf): String = {
37+
val serviceName = sparkConf.get(KAFKA_KERBEROS_SERVICE_NAME)
38+
require(serviceName.nonEmpty, "Kerberos service name must be defined")
39+
val keytab = sparkConf.get(KEYTAB)
40+
require(keytab.nonEmpty, "Keytab must be defined")
41+
val principal = sparkConf.get(PRINCIPAL)
42+
require(principal.nonEmpty, "Principal must be defined")
43+
44+
val params =
45+
s"""
46+
|com.sun.security.auth.module.Krb5LoginModule required
47+
| useKeyTab=true
48+
| serviceName="${serviceName.get}"
49+
| keyTab="${keytab.get}"
50+
| principal="${principal.get}";
51+
""".stripMargin.replace("\n", "")
52+
logInfo(s"Krb JAAS params: $params")
53+
54+
params
55+
}
56+
57+
def getTokenJaasParams(sparkConf: SparkConf): Option[String] = {
58+
val token = UserGroupInformation.getCurrentUser().getCredentials.getToken(
59+
TokenUtil.TOKEN_SERVICE)
60+
if (token != null) {
61+
Some(getScramJaasParams(sparkConf, token))
62+
} else {
63+
None
64+
}
65+
}
66+
67+
private def getScramJaasParams(
68+
sparkConf: SparkConf, token: Token[_ <: TokenIdentifier]): String = {
69+
val serviceName = sparkConf.get(KAFKA_KERBEROS_SERVICE_NAME)
70+
require(serviceName.nonEmpty, "Kerberos service name must be defined")
71+
val username = new String(token.getIdentifier)
72+
val password = new String(token.getPassword)
73+
74+
val params =
75+
s"""
76+
|org.apache.kafka.common.security.scram.ScramLoginModule required
77+
| tokenauth=true
78+
| serviceName="${serviceName.get}"
79+
| username="$username"
80+
| password="$password";
81+
""".stripMargin.replace("\n", "")
82+
logInfo(s"Scram JAAS params: ${params.replaceAll("password=\".*\"", "password=\"[hidden]\"")}")
83+
84+
params
85+
}
86+
}

0 commit comments

Comments
 (0)