Skip to content

Commit 0166c73

Browse files
gaborgsomogyiMarcelo Vanzin
authored and
Marcelo Vanzin
committed
[SPARK-25501][SS] Add kafka delegation token support.
## What changes were proposed in this pull request? It adds kafka delegation token support for structured streaming. Please see the relevant [SPIP](https://docs.google.com/document/d/1ouRayzaJf_N5VQtGhVq9FURXVmRpXzEEWYHob0ne3NY/edit?usp=sharing) What this PR contains: * Configuration parameters for the feature * Delegation token fetching from broker * Usage of token through dynamic JAAS configuration * Minor refactoring in the existing code What this PR doesn't contain: * Documentation changes because design can change ## How was this patch tested? Existing tests + added small amount of additional unit tests. Because it's an external service integration mainly tested on cluster. * 4 node cluster * Kafka broker version 1.1.0 * Topic with 4 partitions * security.protocol = SASL_SSL * sasl.mechanism = SCRAM-SHA-256 An example of obtaining a token: ``` 18/10/01 01:07:49 INFO kafka010.TokenUtil: TOKENID HMAC OWNER RENEWERS ISSUEDATE EXPIRYDATE MAXDATE 18/10/01 01:07:49 INFO kafka010.TokenUtil: D1-v__Q5T_uHx55rW16Jwg [hidden] User:user [] 2018-10-01T01:07 2018-10-02T01:07 2018-10-08T01:07 18/10/01 01:07:49 INFO security.KafkaDelegationTokenProvider: Get token from Kafka: Kind: KAFKA_DELEGATION_TOKEN, Service: kafka.server.delegation.token, Ident: 44 31 2d 76 5f 5f 51 35 54 5f 75 48 78 35 35 72 57 31 36 4a 77 67 ``` An example token usage: ``` 18/10/01 01:08:07 INFO kafka010.KafkaSecurityHelper: Scram JAAS params: org.apache.kafka.common.security.scram.ScramLoginModule required tokenauth=true serviceName="kafka" username="D1-v__Q5T_uHx55rW16Jwg" password="[hidden]"; 18/10/01 01:08:07 INFO kafka010.KafkaSourceProvider: Delegation token detected, using it for login. ``` Closes #22598 from gaborgsomogyi/SPARK-25501. Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
1 parent f97326b commit 0166c73

File tree

15 files changed

+825
-50
lines changed

15 files changed

+825
-50
lines changed

core/pom.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,19 @@
408408
<scope>provided</scope>
409409
</dependency>
410410

411+
<!--
412+
The following kafka dependency used to obtain delegation token.
413+
In order to prevent spark-core from depending on kafka, these deps have been placed in the
414+
"provided" scope, rather than the "compile" scope, and NoClassDefFoundError exceptions are
415+
handled when the user explicitly use neither spark-streaming-kafka nor spark-sql-kafka modules.
416+
-->
417+
<dependency>
418+
<groupId>org.apache.kafka</groupId>
419+
<artifactId>kafka-clients</artifactId>
420+
<version>${kafka.version}</version>
421+
<scope>provided</scope>
422+
</dependency>
423+
411424
</dependencies>
412425
<build>
413426
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>

core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,8 @@ private[spark] class HadoopDelegationTokenManager(
274274
new HadoopFSDelegationTokenProvider(
275275
() => HadoopDelegationTokenManager.this.fileSystemsToAccess())) ++
276276
safeCreateProvider(new HiveDelegationTokenProvider) ++
277-
safeCreateProvider(new HBaseDelegationTokenProvider)
277+
safeCreateProvider(new HBaseDelegationTokenProvider) ++
278+
safeCreateProvider(new KafkaDelegationTokenProvider)
278279

279280
// Filter out providers for which spark.security.credentials.{service}.enabled is false.
280281
providers
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.security
19+
20+
import scala.language.existentials
21+
import scala.util.control.NonFatal
22+
23+
import org.apache.hadoop.conf.Configuration
24+
import org.apache.hadoop.security.Credentials
25+
import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL}
26+
27+
import org.apache.spark.SparkConf
28+
import org.apache.spark.internal.Logging
29+
import org.apache.spark.internal.config._
30+
31+
private[security] class KafkaDelegationTokenProvider
32+
extends HadoopDelegationTokenProvider with Logging {
33+
34+
override def serviceName: String = "kafka"
35+
36+
override def obtainDelegationTokens(
37+
hadoopConf: Configuration,
38+
sparkConf: SparkConf,
39+
creds: Credentials): Option[Long] = {
40+
try {
41+
logDebug("Attempting to fetch Kafka security token.")
42+
val (token, nextRenewalDate) = KafkaTokenUtil.obtainToken(sparkConf)
43+
creds.addToken(token.getService, token)
44+
return Some(nextRenewalDate)
45+
} catch {
46+
case NonFatal(e) =>
47+
logInfo(s"Failed to get token from service $serviceName", e)
48+
}
49+
None
50+
}
51+
52+
override def delegationTokensRequired(
53+
sparkConf: SparkConf,
54+
hadoopConf: Configuration): Boolean = {
55+
val protocol = sparkConf.get(Kafka.SECURITY_PROTOCOL)
56+
sparkConf.contains(Kafka.BOOTSTRAP_SERVERS) &&
57+
(protocol == SASL_SSL.name ||
58+
protocol == SSL.name ||
59+
protocol == SASL_PLAINTEXT.name)
60+
}
61+
}
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.security
19+
20+
import java.{ util => ju }
21+
import java.text.SimpleDateFormat
22+
23+
import scala.util.control.NonFatal
24+
25+
import org.apache.hadoop.io.Text
26+
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
27+
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
28+
import org.apache.kafka.clients.CommonClientConfigs
29+
import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions}
30+
import org.apache.kafka.common.config.SaslConfigs
31+
import org.apache.kafka.common.security.JaasContext
32+
import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL}
33+
import org.apache.kafka.common.security.token.delegation.DelegationToken
34+
35+
import org.apache.spark.SparkConf
36+
import org.apache.spark.internal.Logging
37+
import org.apache.spark.internal.config._
38+
39+
private[spark] object KafkaTokenUtil extends Logging {
40+
val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
41+
val TOKEN_SERVICE = new Text("kafka.server.delegation.token")
42+
43+
private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier {
44+
override def getKind: Text = TOKEN_KIND
45+
}
46+
47+
private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = {
48+
val adminClient = AdminClient.create(createAdminClientProperties(sparkConf))
49+
val createDelegationTokenOptions = new CreateDelegationTokenOptions()
50+
val createResult = adminClient.createDelegationToken(createDelegationTokenOptions)
51+
val token = createResult.delegationToken().get()
52+
printToken(token)
53+
54+
(new Token[KafkaDelegationTokenIdentifier](
55+
token.tokenInfo.tokenId.getBytes,
56+
token.hmacAsBase64String.getBytes,
57+
TOKEN_KIND,
58+
TOKEN_SERVICE
59+
), token.tokenInfo.expiryTimestamp)
60+
}
61+
62+
private[security] def createAdminClientProperties(sparkConf: SparkConf): ju.Properties = {
63+
val adminClientProperties = new ju.Properties
64+
65+
val bootstrapServers = sparkConf.get(Kafka.BOOTSTRAP_SERVERS)
66+
require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " +
67+
"servers not configured.")
68+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get)
69+
70+
val protocol = sparkConf.get(Kafka.SECURITY_PROTOCOL)
71+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol)
72+
protocol match {
73+
case SASL_SSL.name =>
74+
setTrustStoreProperties(sparkConf, adminClientProperties)
75+
76+
case SSL.name =>
77+
setTrustStoreProperties(sparkConf, adminClientProperties)
78+
setKeyStoreProperties(sparkConf, adminClientProperties)
79+
logWarning("Obtaining kafka delegation token with SSL protocol. Please " +
80+
"configure 2-way authentication on the broker side.")
81+
82+
case SASL_PLAINTEXT.name =>
83+
logWarning("Obtaining kafka delegation token through plain communication channel. Please " +
84+
"consider the security impact.")
85+
}
86+
87+
// There are multiple possibilities to log in and applied in the following order:
88+
// - JVM global security provided -> try to log in with JVM global security configuration
89+
// which can be configured for example with 'java.security.auth.login.config'.
90+
// For this no additional parameter needed.
91+
// - Keytab is provided -> try to log in with kerberos module and keytab using kafka's dynamic
92+
// JAAS configuration.
93+
// - Keytab not provided -> try to log in with kerberos module and ticket cache using kafka's
94+
// dynamic JAAS configuration.
95+
// Kafka client is unable to use subject from JVM which already logged in
96+
// to kdc (see KAFKA-7677)
97+
if (isGlobalJaasConfigurationProvided) {
98+
logDebug("JVM global security configuration detected, using it for login.")
99+
} else {
100+
adminClientProperties.put(SaslConfigs.SASL_MECHANISM, SaslConfigs.GSSAPI_MECHANISM)
101+
if (sparkConf.contains(KEYTAB)) {
102+
logDebug("Keytab detected, using it for login.")
103+
val jaasParams = getKeytabJaasParams(sparkConf)
104+
adminClientProperties.put(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
105+
} else {
106+
logDebug("Using ticket cache for login.")
107+
val jaasParams = getTicketCacheJaasParams(sparkConf)
108+
adminClientProperties.put(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
109+
}
110+
}
111+
112+
adminClientProperties
113+
}
114+
115+
def isGlobalJaasConfigurationProvided: Boolean = {
116+
try {
117+
JaasContext.loadClientContext(ju.Collections.emptyMap[String, Object]())
118+
true
119+
} catch {
120+
case NonFatal(_) => false
121+
}
122+
}
123+
124+
private def setTrustStoreProperties(sparkConf: SparkConf, properties: ju.Properties): Unit = {
125+
sparkConf.get(Kafka.TRUSTSTORE_LOCATION).foreach { truststoreLocation =>
126+
properties.put("ssl.truststore.location", truststoreLocation)
127+
}
128+
sparkConf.get(Kafka.TRUSTSTORE_PASSWORD).foreach { truststorePassword =>
129+
properties.put("ssl.truststore.password", truststorePassword)
130+
}
131+
}
132+
133+
private def setKeyStoreProperties(sparkConf: SparkConf, properties: ju.Properties): Unit = {
134+
sparkConf.get(Kafka.KEYSTORE_LOCATION).foreach { keystoreLocation =>
135+
properties.put("ssl.keystore.location", keystoreLocation)
136+
}
137+
sparkConf.get(Kafka.KEYSTORE_PASSWORD).foreach { keystorePassword =>
138+
properties.put("ssl.keystore.password", keystorePassword)
139+
}
140+
sparkConf.get(Kafka.KEY_PASSWORD).foreach { keyPassword =>
141+
properties.put("ssl.key.password", keyPassword)
142+
}
143+
}
144+
145+
private[security] def getKeytabJaasParams(sparkConf: SparkConf): String = {
146+
val serviceName = sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)
147+
require(serviceName.nonEmpty, "Kerberos service name must be defined")
148+
149+
val params =
150+
s"""
151+
|${getKrb5LoginModuleName} required
152+
| useKeyTab=true
153+
| serviceName="${serviceName.get}"
154+
| keyTab="${sparkConf.get(KEYTAB).get}"
155+
| principal="${sparkConf.get(PRINCIPAL).get}";
156+
""".stripMargin.replace("\n", "")
157+
logDebug(s"Krb keytab JAAS params: $params")
158+
params
159+
}
160+
161+
def getTicketCacheJaasParams(sparkConf: SparkConf): String = {
162+
val serviceName = sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)
163+
require(serviceName.nonEmpty, "Kerberos service name must be defined")
164+
165+
val params =
166+
s"""
167+
|${getKrb5LoginModuleName} required
168+
| useTicketCache=true
169+
| serviceName="${serviceName.get}";
170+
""".stripMargin.replace("\n", "")
171+
logDebug(s"Krb ticket cache JAAS params: $params")
172+
params
173+
}
174+
175+
/**
176+
* Krb5LoginModule package vary in different JVMs.
177+
* Please see Hadoop UserGroupInformation for further details.
178+
*/
179+
private def getKrb5LoginModuleName(): String = {
180+
if (System.getProperty("java.vendor").contains("IBM")) {
181+
"com.ibm.security.auth.module.Krb5LoginModule"
182+
} else {
183+
"com.sun.security.auth.module.Krb5LoginModule"
184+
}
185+
}
186+
187+
private def printToken(token: DelegationToken): Unit = {
188+
if (log.isDebugEnabled) {
189+
val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
190+
logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
191+
"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE"))
192+
val tokenInfo = token.tokenInfo
193+
logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
194+
tokenInfo.tokenId,
195+
tokenInfo.owner,
196+
tokenInfo.renewersAsString,
197+
dateFormat.format(tokenInfo.issueTimestamp),
198+
dateFormat.format(tokenInfo.expiryTimestamp),
199+
dateFormat.format(tokenInfo.maxTimestamp)))
200+
}
201+
}
202+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.internal.config
19+
20+
private[spark] object Kafka {
21+
22+
val BOOTSTRAP_SERVERS =
23+
ConfigBuilder("spark.kafka.bootstrap.servers")
24+
.doc("A list of coma separated host/port pairs to use for establishing the initial " +
25+
"connection to the Kafka cluster. For further details please see kafka documentation. " +
26+
"Only used to obtain delegation token.")
27+
.stringConf
28+
.createOptional
29+
30+
val SECURITY_PROTOCOL =
31+
ConfigBuilder("spark.kafka.security.protocol")
32+
.doc("Protocol used to communicate with brokers. For further details please see kafka " +
33+
"documentation. Only used to obtain delegation token.")
34+
.stringConf
35+
.createWithDefault("SASL_SSL")
36+
37+
val KERBEROS_SERVICE_NAME =
38+
ConfigBuilder("spark.kafka.sasl.kerberos.service.name")
39+
.doc("The Kerberos principal name that Kafka runs as. This can be defined either in " +
40+
"Kafka's JAAS config or in Kafka's config. For further details please see kafka " +
41+
"documentation. Only used to obtain delegation token.")
42+
.stringConf
43+
.createOptional
44+
45+
val TRUSTSTORE_LOCATION =
46+
ConfigBuilder("spark.kafka.ssl.truststore.location")
47+
.doc("The location of the trust store file. For further details please see kafka " +
48+
"documentation. Only used to obtain delegation token.")
49+
.stringConf
50+
.createOptional
51+
52+
val TRUSTSTORE_PASSWORD =
53+
ConfigBuilder("spark.kafka.ssl.truststore.password")
54+
.doc("The store password for the trust store file. This is optional for client and only " +
55+
"needed if ssl.truststore.location is configured. For further details please see kafka " +
56+
"documentation. Only used to obtain delegation token.")
57+
.stringConf
58+
.createOptional
59+
60+
val KEYSTORE_LOCATION =
61+
ConfigBuilder("spark.kafka.ssl.keystore.location")
62+
.doc("The location of the key store file. This is optional for client and can be used for " +
63+
"two-way authentication for client. For further details please see kafka documentation. " +
64+
"Only used to obtain delegation token.")
65+
.stringConf
66+
.createOptional
67+
68+
val KEYSTORE_PASSWORD =
69+
ConfigBuilder("spark.kafka.ssl.keystore.password")
70+
.doc("The store password for the key store file. This is optional for client and only " +
71+
"needed if ssl.keystore.location is configured. For further details please see kafka " +
72+
"documentation. Only used to obtain delegation token.")
73+
.stringConf
74+
.createOptional
75+
76+
val KEY_PASSWORD =
77+
ConfigBuilder("spark.kafka.ssl.key.password")
78+
.doc("The password of the private key in the key store file. This is optional for client. " +
79+
"For further details please see kafka documentation. Only used to obtain delegation token.")
80+
.stringConf
81+
.createOptional
82+
}

0 commit comments

Comments
 (0)