Skip to content

Commit 911fadf

Browse files
gaborgsomogyiMarcelo Vanzin
authored andcommitted
[SPARK-27748][SS] Kafka consumer/producer password/token redaction.
## What changes were proposed in this pull request? Kafka parameters are logged at several places and the following parameters has to be redacted: * Delegation token * `ssl.truststore.password` * `ssl.keystore.password` * `ssl.key.password` This PR contains: * Spark central redaction framework used to redact passwords (`spark.redaction.regex`) * Custom redaction added to handle `sasl.jaas.config` (delegation token) * Redaction code added into consumer/producer code * Test refactor ## How was this patch tested? Existing + additional unit tests. Closes #24627 from gaborgsomogyi/SPARK-27748. Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
1 parent 8486680 commit 911fadf

File tree

8 files changed

+248
-60
lines changed

8 files changed

+248
-60
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import scala.util.control.NonFatal
2828

2929
import org.apache.spark.SparkEnv
3030
import org.apache.spark.internal.Logging
31-
import org.apache.spark.kafka010.KafkaConfigUpdater
31+
import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaRedactionUtil}
3232

3333
private[kafka010] object CachedKafkaProducer extends Logging {
3434

@@ -42,8 +42,7 @@ private[kafka010] object CachedKafkaProducer extends Logging {
4242

4343
private val cacheLoader = new CacheLoader[Seq[(String, Object)], Producer] {
4444
override def load(config: Seq[(String, Object)]): Producer = {
45-
val configMap = config.map(x => x._1 -> x._2).toMap.asJava
46-
createKafkaProducer(configMap)
45+
createKafkaProducer(config)
4746
}
4847
}
4948

@@ -52,8 +51,11 @@ private[kafka010] object CachedKafkaProducer extends Logging {
5251
notification: RemovalNotification[Seq[(String, Object)], Producer]): Unit = {
5352
val paramsSeq: Seq[(String, Object)] = notification.getKey
5453
val producer: Producer = notification.getValue
55-
logDebug(
56-
s"Evicting kafka producer $producer params: $paramsSeq, due to ${notification.getCause}")
54+
if (log.isDebugEnabled()) {
55+
val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq)
56+
logDebug(s"Evicting kafka producer $producer params: $redactedParamsSeq, " +
57+
s"due to ${notification.getCause}")
58+
}
5759
close(paramsSeq, producer)
5860
}
5961
}
@@ -63,9 +65,12 @@ private[kafka010] object CachedKafkaProducer extends Logging {
6365
.removalListener(removalListener)
6466
.build[Seq[(String, Object)], Producer](cacheLoader)
6567

66-
private def createKafkaProducer(producerConfiguration: ju.Map[String, Object]): Producer = {
67-
val kafkaProducer: Producer = new Producer(producerConfiguration)
68-
logDebug(s"Created a new instance of KafkaProducer for $producerConfiguration.")
68+
private def createKafkaProducer(paramsSeq: Seq[(String, Object)]): Producer = {
69+
val kafkaProducer: Producer = new Producer(paramsSeq.toMap.asJava)
70+
if (log.isDebugEnabled()) {
71+
val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq)
72+
logDebug(s"Created a new instance of KafkaProducer for $redactedParamsSeq.")
73+
}
6974
kafkaProducer
7075
}
7176

@@ -103,7 +108,10 @@ private[kafka010] object CachedKafkaProducer extends Logging {
103108
/** Auto close on cache evict */
104109
private def close(paramsSeq: Seq[(String, Object)], producer: Producer): Unit = {
105110
try {
106-
logInfo(s"Closing the KafkaProducer with params: ${paramsSeq.mkString("\n")}.")
111+
if (log.isInfoEnabled()) {
112+
val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq)
113+
logInfo(s"Closing the KafkaProducer with params: ${redactedParamsSeq.mkString("\n")}.")
114+
}
107115
producer.close()
108116
} catch {
109117
case NonFatal(e) => logWarning("Error while closing kafka producer.", e)

external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaConfigUpdater.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,22 @@ private[spark] case class KafkaConfigUpdater(module: String, kafkaParams: Map[St
3636

3737
def set(key: String, value: Object): this.type = {
3838
map.put(key, value)
39-
logDebug(s"$module: Set $key to $value, earlier value: ${kafkaParams.getOrElse(key, "")}")
39+
if (log.isDebugEnabled()) {
40+
val redactedValue = KafkaRedactionUtil.redactParams(Seq((key, value))).head._2
41+
val redactedOldValue = KafkaRedactionUtil
42+
.redactParams(Seq((key, kafkaParams.getOrElse(key, "")))).head._2
43+
logDebug(s"$module: Set $key to $redactedValue, earlier value: $redactedOldValue")
44+
}
4045
this
4146
}
4247

4348
def setIfUnset(key: String, value: Object): this.type = {
4449
if (!map.containsKey(key)) {
4550
map.put(key, value)
46-
logDebug(s"$module: Set $key to $value")
51+
if (log.isDebugEnabled()) {
52+
val redactedValue = KafkaRedactionUtil.redactParams(Seq((key, value))).head._2
53+
logDebug(s"$module: Set $key to $redactedValue")
54+
}
4755
}
4856
this
4957
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.kafka010
19+
20+
import org.apache.kafka.common.config.SaslConfigs
21+
22+
import org.apache.spark.SparkEnv
23+
import org.apache.spark.internal.Logging
24+
import org.apache.spark.internal.config.SECRET_REDACTION_PATTERN
25+
import org.apache.spark.util.Utils.{redact, REDACTION_REPLACEMENT_TEXT}
26+
27+
private[spark] object KafkaRedactionUtil extends Logging {
28+
private[spark] def redactParams(params: Seq[(String, Object)]): Seq[(String, String)] = {
29+
val redactionPattern = Some(SparkEnv.get.conf.get(SECRET_REDACTION_PATTERN))
30+
params.map { case (key, value) =>
31+
if (value != null) {
32+
if (key.equalsIgnoreCase(SaslConfigs.SASL_JAAS_CONFIG)) {
33+
(key, redactJaasParam(value.asInstanceOf[String]))
34+
} else {
35+
val (_, newValue) = redact(redactionPattern, Seq((key, value.toString))).head
36+
(key, newValue)
37+
}
38+
} else {
39+
(key, value.asInstanceOf[String])
40+
}
41+
}
42+
}
43+
44+
private[kafka010] def redactJaasParam(param: String): String = {
45+
if (param != null && !param.isEmpty) {
46+
param.replaceAll("password=\".*\"", s"""password="$REDACTION_REPLACEMENT_TEXT"""")
47+
} else {
48+
param
49+
}
50+
}
51+
}

external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
4141
import org.apache.spark.internal.Logging
4242
import org.apache.spark.internal.config._
4343
import org.apache.spark.util.Utils
44+
import org.apache.spark.util.Utils.REDACTION_REPLACEMENT_TEXT
4445

4546
private[spark] object KafkaTokenUtil extends Logging {
4647
val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
@@ -193,7 +194,7 @@ private[spark] object KafkaTokenUtil extends Logging {
193194
| debug=${isGlobalKrbDebugEnabled()}
194195
| useTicketCache=true
195196
| serviceName="${clusterConf.kerberosServiceName}";
196-
""".stripMargin.replace("\n", "")
197+
""".stripMargin.replace("\n", "").trim
197198
logDebug(s"Krb ticket cache JAAS params: $params")
198199
params
199200
}
@@ -226,7 +227,8 @@ private[spark] object KafkaTokenUtil extends Logging {
226227
logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
227228
"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE"))
228229
val tokenInfo = token.tokenInfo
229-
logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
230+
logDebug("%-15s %-15s %-15s %-25s %-15s %-15s %-15s".format(
231+
REDACTION_REPLACEMENT_TEXT,
230232
tokenInfo.tokenId,
231233
tokenInfo.owner,
232234
tokenInfo.renewersAsString,
@@ -268,8 +270,8 @@ private[spark] object KafkaTokenUtil extends Logging {
268270
| serviceName="${clusterConf.kerberosServiceName}"
269271
| username="$username"
270272
| password="$password";
271-
""".stripMargin.replace("\n", "")
272-
logDebug(s"Scram JAAS params: ${params.replaceAll("password=\".*\"", "password=\"[hidden]\"")}")
273+
""".stripMargin.replace("\n", "").trim
274+
logDebug(s"Scram JAAS params: ${KafkaRedactionUtil.redactJaasParam(params)}")
273275

274276
params
275277
}

external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaConfigUpdaterSuite.scala

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,10 @@ import org.apache.kafka.common.config.SaslConfigs
2323
import org.apache.spark.SparkFunSuite
2424

2525
class KafkaConfigUpdaterSuite extends SparkFunSuite with KafkaDelegationTokenTest {
26-
private val identifier = "cluster1"
27-
private val tokenService = KafkaTokenUtil.getTokenService(identifier)
2826
private val testModule = "testModule"
2927
private val testKey = "testKey"
3028
private val testValue = "testValue"
3129
private val otherTestValue = "otherTestValue"
32-
private val bootStrapServers = "127.0.0.1:0"
3330

3431
test("set should always set value") {
3532
val params = Map.empty[String, String]
@@ -81,10 +78,10 @@ class KafkaConfigUpdaterSuite extends SparkFunSuite with KafkaDelegationTokenTes
8178
)
8279
setSparkEnv(
8380
Map(
84-
s"spark.kafka.clusters.$identifier.auth.bootstrap.servers" -> bootStrapServers
81+
s"spark.kafka.clusters.$identifier1.auth.bootstrap.servers" -> bootStrapServers
8582
)
8683
)
87-
addTokenToUGI(tokenService)
84+
addTokenToUGI(tokenService1)
8885

8986
val updatedParams = KafkaConfigUpdater(testModule, params)
9087
.setAuthenticationConfigIfNeeded()
@@ -103,11 +100,11 @@ class KafkaConfigUpdaterSuite extends SparkFunSuite with KafkaDelegationTokenTes
103100
)
104101
setSparkEnv(
105102
Map(
106-
s"spark.kafka.clusters.$identifier.auth.bootstrap.servers" -> bootStrapServers,
107-
s"spark.kafka.clusters.$identifier.sasl.token.mechanism" -> "intentionally_invalid"
103+
s"spark.kafka.clusters.$identifier1.auth.bootstrap.servers" -> bootStrapServers,
104+
s"spark.kafka.clusters.$identifier1.sasl.token.mechanism" -> "intentionally_invalid"
108105
)
109106
)
110-
addTokenToUGI(tokenService)
107+
addTokenToUGI(tokenService1)
111108

112109
val e = intercept[IllegalArgumentException] {
113110
KafkaConfigUpdater(testModule, params)

external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,21 @@ trait KafkaDelegationTokenTest extends BeforeAndAfterEach {
4040
protected val tokenId = "tokenId" + ju.UUID.randomUUID().toString
4141
protected val tokenPassword = "tokenPassword" + ju.UUID.randomUUID().toString
4242

43+
protected val identifier1 = "cluster1"
44+
protected val identifier2 = "cluster2"
45+
protected val tokenService1 = KafkaTokenUtil.getTokenService(identifier1)
46+
protected val tokenService2 = KafkaTokenUtil.getTokenService(identifier2)
47+
protected val bootStrapServers = "127.0.0.1:0"
48+
protected val matchingTargetServersRegex = "127.0.0.*:0"
49+
protected val nonMatchingTargetServersRegex = "127.0.intentionally_non_matching.*:0"
50+
protected val trustStoreLocation = "/path/to/trustStore"
51+
protected val trustStorePassword = "trustStoreSecret"
52+
protected val keyStoreLocation = "/path/to/keyStore"
53+
protected val keyStorePassword = "keyStoreSecret"
54+
protected val keyPassword = "keySecret"
55+
protected val keytab = "/path/to/keytab"
56+
protected val principal = "user@domain.com"
57+
4358
private class KafkaJaasConfiguration extends Configuration {
4459
val entry =
4560
new AppConfigurationEntry(
@@ -89,4 +104,21 @@ trait KafkaDelegationTokenTest extends BeforeAndAfterEach {
89104
doReturn(conf).when(env).conf
90105
SparkEnv.set(env)
91106
}
107+
108+
protected def createClusterConf(
109+
identifier: String,
110+
securityProtocol: String): KafkaTokenClusterConf = {
111+
KafkaTokenClusterConf(
112+
identifier,
113+
bootStrapServers,
114+
KafkaTokenSparkConf.DEFAULT_TARGET_SERVERS_REGEX,
115+
securityProtocol,
116+
KafkaTokenSparkConf.DEFAULT_SASL_KERBEROS_SERVICE_NAME,
117+
Some(trustStoreLocation),
118+
Some(trustStorePassword),
119+
Some(keyStoreLocation),
120+
Some(keyStorePassword),
121+
Some(keyPassword),
122+
KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM)
123+
}
92124
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.kafka010
19+
20+
import java.{util => ju}
21+
22+
import org.apache.kafka.clients.consumer.ConsumerConfig
23+
import org.apache.kafka.common.config.{SaslConfigs, SslConfigs}
24+
import org.apache.kafka.common.security.auth.SecurityProtocol.SASL_SSL
25+
import org.apache.kafka.common.serialization.StringDeserializer
26+
27+
import org.apache.spark.SparkFunSuite
28+
import org.apache.spark.internal.config.SECRET_REDACTION_PATTERN
29+
import org.apache.spark.util.Utils.REDACTION_REPLACEMENT_TEXT
30+
31+
class KafkaRedactionUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
32+
test("redactParams should give back empty parameters") {
33+
setSparkEnv(Map.empty)
34+
assert(KafkaRedactionUtil.redactParams(Seq()) === Seq())
35+
}
36+
37+
test("redactParams should give back null value") {
38+
setSparkEnv(Map.empty)
39+
val kafkaParams = Seq(
40+
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> null
41+
)
42+
43+
assert(KafkaRedactionUtil.redactParams(kafkaParams) === kafkaParams)
44+
}
45+
46+
test("redactParams should redact non String parameters") {
47+
setSparkEnv(
48+
Map(
49+
SECRET_REDACTION_PATTERN.key -> ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
50+
)
51+
)
52+
val kafkaParams = Seq(
53+
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
54+
)
55+
56+
val redactedParams = KafkaRedactionUtil.redactParams(kafkaParams).toMap
57+
58+
assert(redactedParams.size === 1)
59+
assert(redactedParams.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG).get
60+
=== REDACTION_REPLACEMENT_TEXT)
61+
}
62+
63+
test("redactParams should redact token password from parameters") {
64+
setSparkEnv(Map.empty)
65+
val groupId = "id-" + ju.UUID.randomUUID().toString
66+
addTokenToUGI(tokenService1)
67+
val clusterConf = createClusterConf(identifier1, SASL_SSL.name)
68+
val jaasParams = KafkaTokenUtil.getTokenJaasParams(clusterConf)
69+
val kafkaParams = Seq(
70+
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
71+
SaslConfigs.SASL_JAAS_CONFIG -> jaasParams
72+
)
73+
74+
val redactedParams = KafkaRedactionUtil.redactParams(kafkaParams).toMap
75+
76+
assert(redactedParams.size === 2)
77+
assert(redactedParams.get(ConsumerConfig.GROUP_ID_CONFIG).get === groupId)
78+
val redactedJaasParams = redactedParams.get(SaslConfigs.SASL_JAAS_CONFIG).get
79+
assert(redactedJaasParams.contains(tokenId))
80+
assert(!redactedJaasParams.contains(tokenPassword))
81+
}
82+
83+
test("redactParams should redact passwords from parameters") {
84+
setSparkEnv(Map.empty)
85+
val groupId = "id-" + ju.UUID.randomUUID().toString
86+
val kafkaParams = Seq(
87+
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
88+
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG -> trustStorePassword,
89+
SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG -> keyStorePassword,
90+
SslConfigs.SSL_KEY_PASSWORD_CONFIG -> keyPassword
91+
)
92+
93+
val redactedParams = KafkaRedactionUtil.redactParams(kafkaParams).toMap
94+
95+
assert(redactedParams.size === 4)
96+
assert(redactedParams(ConsumerConfig.GROUP_ID_CONFIG) === groupId)
97+
assert(redactedParams(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG) === REDACTION_REPLACEMENT_TEXT)
98+
assert(redactedParams(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG) === REDACTION_REPLACEMENT_TEXT)
99+
assert(redactedParams(SslConfigs.SSL_KEY_PASSWORD_CONFIG) === REDACTION_REPLACEMENT_TEXT)
100+
}
101+
102+
test("redactJaasParam should give back null") {
103+
assert(KafkaRedactionUtil.redactJaasParam(null) === null)
104+
}
105+
106+
test("redactJaasParam should give back empty string") {
107+
assert(KafkaRedactionUtil.redactJaasParam("") === "")
108+
}
109+
110+
test("redactJaasParam should redact token password") {
111+
addTokenToUGI(tokenService1)
112+
val clusterConf = createClusterConf(identifier1, SASL_SSL.name)
113+
val jaasParams = KafkaTokenUtil.getTokenJaasParams(clusterConf)
114+
115+
val redactedJaasParams = KafkaRedactionUtil.redactJaasParam(jaasParams)
116+
117+
assert(redactedJaasParams.contains(tokenId))
118+
assert(!redactedJaasParams.contains(tokenPassword))
119+
}
120+
}

0 commit comments

Comments
 (0)