Skip to content

Commit c591632

Browse files
committed
Fix
1 parent 7e445f7 commit c591632

File tree

3 files changed

+20
-5
lines changed

3 files changed

+20
-5
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ private[kafka010] object CachedKafkaProducer extends Logging {
6666
.build[Seq[(String, Object)], Producer](cacheLoader)
6767

6868
private def createKafkaProducer(paramsSeq: Seq[(String, Object)]): Producer = {
69-
val kafkaProducer: Producer = new Producer(paramsSeq.map(x => x._1 -> x._2).toMap.asJava)
69+
val kafkaProducer: Producer = new Producer(paramsSeq.toMap.asJava)
7070
if (log.isDebugEnabled()) {
7171
val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq)
7272
logDebug(s"Created a new instance of KafkaProducer for $redactedParamsSeq.")

external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaRedactionUtil.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,19 @@ import org.apache.spark.util.Utils.{redact, REDACTION_REPLACEMENT_TEXT}
2626

2727
private[spark] object KafkaRedactionUtil extends Logging {
2828
private[spark] def redactParams(params: Seq[(String, Object)]): Seq[(String, Object)] = {
29-
val redactionPattern = SparkEnv.get.conf.get(SECRET_REDACTION_PATTERN)
29+
val redactionPattern = Some(SparkEnv.get.conf.get(SECRET_REDACTION_PATTERN))
3030
params.map { case (key, value) =>
3131
if (key.equalsIgnoreCase(SaslConfigs.SASL_JAAS_CONFIG)) {
3232
(key, redactJaasParam(value.asInstanceOf[String]))
3333
} else {
34-
val (_, newValue) = redact(Some(redactionPattern), Seq((key, value.asInstanceOf[String])))
35-
.head
36-
(key, newValue)
34+
value match {
35+
case s: String =>
36+
val (_, newValue) = redact(redactionPattern, Seq((key, s))).head
37+
(key, newValue)
38+
39+
case _ =>
40+
(key, value)
41+
}
3742
}
3843
}
3944
}

external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaRedactionUtilSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.{util => ju}
2222
import org.apache.kafka.clients.consumer.ConsumerConfig
2323
import org.apache.kafka.common.config.{SaslConfigs, SslConfigs}
2424
import org.apache.kafka.common.security.auth.SecurityProtocol.SASL_SSL
25+
import org.apache.kafka.common.serialization.StringDeserializer
2526

2627
import org.apache.spark.SparkFunSuite
2728
import org.apache.spark.util.Utils.REDACTION_REPLACEMENT_TEXT
@@ -32,6 +33,15 @@ class KafkaRedactionUtilSuite extends SparkFunSuite with KafkaDelegationTokenTes
3233
assert(KafkaRedactionUtil.redactParams(Seq()) === Seq())
3334
}
3435

36+
test("redactParams should give back non String parameters") {
37+
setSparkEnv(Map())
38+
val kafkaParams = Seq(
39+
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
40+
)
41+
42+
assert(KafkaRedactionUtil.redactParams(kafkaParams) === kafkaParams)
43+
}
44+
3545
test("redactParams should redact token password from parameters") {
3646
setSparkEnv(Map())
3747
val groupId = "id-" + ju.UUID.randomUUID().toString

0 commit comments

Comments
 (0)