Skip to content

Commit 6daa783

Browse files
gaborgsomogyiMarcelo Vanzin
authored and
Marcelo Vanzin
committed
[SPARK-26322][SS] Add spark.kafka.sasl.token.mechanism to ease delegation token configuration.
## What changes were proposed in this pull request? When Kafka delegation token obtained, SCRAM `sasl.mechanism` has to be configured for authentication. This can be configured on the related source/sink which is inconvenient from user perspective. Such granularity is not required and this configuration can be implemented with one central parameter. In this PR `spark.kafka.sasl.token.mechanism` added to configure this centrally (default: `SCRAM-SHA-512`). ## How was this patch tested? Existing unit tests + on cluster. Closes #23274 from gaborgsomogyi/SPARK-26322. Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
1 parent 2920438 commit 6daa783

File tree

3 files changed

+21
-147
lines changed

3 files changed

+21
-147
lines changed

core/src/main/scala/org/apache/spark/internal/config/Kafka.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,13 @@ private[spark] object Kafka {
7979
"For further details please see kafka documentation. Only used to obtain delegation token.")
8080
.stringConf
8181
.createOptional
82+
83+
val TOKEN_SASL_MECHANISM =
84+
ConfigBuilder("spark.kafka.sasl.token.mechanism")
85+
.doc("SASL mechanism used for client connections with delegation token. Because SCRAM " +
86+
"login module used for authentication a compatible mechanism has to be set here. " +
87+
"For further details please see kafka documentation (sasl.mechanism). Only used to " +
88+
"authenticate against Kafka broker with delegation token.")
89+
.stringConf
90+
.createWithDefault("SCRAM-SHA-512")
8291
}

docs/structured-streaming-kafka-integration.md

Lines changed: 4 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -642,9 +642,9 @@ This way the application can be configured via Spark parameters and may not need
642642
configuration (Spark can use Kafka's dynamic JAAS configuration feature). For further information
643643
about delegation tokens, see [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token).
644644

645-
The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers`,
645+
The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers` is set,
646646
Spark considers the following log in options, in order of preference:
647-
- **JAAS login configuration**
647+
- **JAAS login configuration**, please see example below.
648648
- **Keytab file**, such as,
649649

650650
./bin/spark-submit \
@@ -669,144 +669,8 @@ Kafka broker configuration):
669669

670670
After obtaining delegation token successfully, Spark distributes it across nodes and renews it accordingly.
671671
Delegation token uses `SCRAM` login module for authentication and because of that the appropriate
672-
`sasl.mechanism` has to be configured on source/sink (it must match with Kafka broker configuration):
673-
674-
<div class="codetabs">
675-
<div data-lang="scala" markdown="1">
676-
{% highlight scala %}
677-
678-
// Setting on Kafka Source for Streaming Queries
679-
val df = spark
680-
.readStream
681-
.format("kafka")
682-
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
683-
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
684-
.option("subscribe", "topic1")
685-
.load()
686-
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
687-
.as[(String, String)]
688-
689-
// Setting on Kafka Source for Batch Queries
690-
val df = spark
691-
.read
692-
.format("kafka")
693-
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
694-
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
695-
.option("subscribe", "topic1")
696-
.load()
697-
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
698-
.as[(String, String)]
699-
700-
// Setting on Kafka Sink for Streaming Queries
701-
val ds = df
702-
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
703-
.writeStream
704-
.format("kafka")
705-
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
706-
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
707-
.option("topic", "topic1")
708-
.start()
709-
710-
// Setting on Kafka Sink for Batch Queries
711-
val ds = df
712-
.selectExpr("topic1", "CAST(key AS STRING)", "CAST(value AS STRING)")
713-
.write
714-
.format("kafka")
715-
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
716-
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
717-
.save()
718-
719-
{% endhighlight %}
720-
</div>
721-
<div data-lang="java" markdown="1">
722-
{% highlight java %}
723-
724-
// Setting on Kafka Source for Streaming Queries
725-
Dataset<Row> df = spark
726-
.readStream()
727-
.format("kafka")
728-
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
729-
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
730-
.option("subscribe", "topic1")
731-
.load();
732-
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
733-
734-
// Setting on Kafka Source for Batch Queries
735-
Dataset<Row> df = spark
736-
.read()
737-
.format("kafka")
738-
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
739-
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
740-
.option("subscribe", "topic1")
741-
.load();
742-
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
743-
744-
// Setting on Kafka Sink for Streaming Queries
745-
StreamingQuery ds = df
746-
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
747-
.writeStream()
748-
.format("kafka")
749-
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
750-
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
751-
.option("topic", "topic1")
752-
.start();
753-
754-
// Setting on Kafka Sink for Batch Queries
755-
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
756-
.write()
757-
.format("kafka")
758-
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
759-
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
760-
.option("topic", "topic1")
761-
.save();
762-
763-
{% endhighlight %}
764-
</div>
765-
<div data-lang="python" markdown="1">
766-
{% highlight python %}
767-
768-
// Setting on Kafka Source for Streaming Queries
769-
df = spark \
770-
.readStream \
771-
.format("kafka") \
772-
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
773-
.option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
774-
.option("subscribe", "topic1") \
775-
.load()
776-
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
777-
778-
// Setting on Kafka Source for Batch Queries
779-
df = spark \
780-
.read \
781-
.format("kafka") \
782-
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
783-
.option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
784-
.option("subscribe", "topic1") \
785-
.load()
786-
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
787-
788-
// Setting on Kafka Sink for Streaming Queries
789-
ds = df \
790-
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
791-
.writeStream \
792-
.format("kafka") \
793-
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
794-
.option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
795-
.option("topic", "topic1") \
796-
.start()
797-
798-
// Setting on Kafka Sink for Batch Queries
799-
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
800-
.write \
801-
.format("kafka") \
802-
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
803-
.option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
804-
.option("topic", "topic1") \
805-
.save()
806-
807-
{% endhighlight %}
808-
</div>
809-
</div>
672+
`spark.kafka.sasl.token.mechanism` (default: `SCRAM-SHA-512`) has to be configured. Also, this parameter
673+
must match with Kafka broker configuration.
810674

811675
When delegation token is available on an executor it can be overridden with JAAS login configuration.
812676

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
3030
import org.apache.spark.SparkEnv
3131
import org.apache.spark.deploy.security.KafkaTokenUtil
3232
import org.apache.spark.internal.Logging
33+
import org.apache.spark.internal.config._
3334
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
3435
import org.apache.spark.sql.execution.streaming.{Sink, Source}
3536
import org.apache.spark.sql.sources._
@@ -501,7 +502,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
501502
// If buffer config is not set, set it to reasonable value to work around
502503
// buffer issues (see KAFKA-3135)
503504
.setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
504-
.setTokenJaasConfigIfNeeded()
505+
.setAuthenticationConfigIfNeeded()
505506
.build()
506507

507508
def kafkaParamsForExecutors(
@@ -523,7 +524,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
523524
// If buffer config is not set, set it to reasonable value to work around
524525
// buffer issues (see KAFKA-3135)
525526
.setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
526-
.setTokenJaasConfigIfNeeded()
527+
.setAuthenticationConfigIfNeeded()
527528
.build()
528529

529530
/**
@@ -556,7 +557,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
556557
this
557558
}
558559

559-
def setTokenJaasConfigIfNeeded(): ConfigUpdater = {
560+
def setAuthenticationConfigIfNeeded(): ConfigUpdater = {
560561
// There are multiple possibilities to log in and applied in the following order:
561562
// - JVM global security provided -> try to log in with JVM global security configuration
562563
// which can be configured for example with 'java.security.auth.login.config'.
@@ -568,11 +569,11 @@ private[kafka010] object KafkaSourceProvider extends Logging {
568569
} else if (KafkaSecurityHelper.isTokenAvailable()) {
569570
logDebug("Delegation token detected, using it for login.")
570571
val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf)
571-
val mechanism = kafkaParams
572-
.getOrElse(SaslConfigs.SASL_MECHANISM, SaslConfigs.DEFAULT_SASL_MECHANISM)
572+
set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
573+
val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM)
573574
require(mechanism.startsWith("SCRAM"),
574575
"Delegation token works only with SCRAM mechanism.")
575-
set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
576+
set(SaslConfigs.SASL_MECHANISM, mechanism)
576577
}
577578
this
578579
}
@@ -600,7 +601,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
600601
ConfigUpdater("executor", specifiedKafkaParams)
601602
.set(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serClassName)
602603
.set(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serClassName)
603-
.setTokenJaasConfigIfNeeded()
604+
.setAuthenticationConfigIfNeeded()
604605
.build()
605606
}
606607

0 commit comments

Comments
 (0)