Skip to content

[SPARK-26236][SS] Add kafka delegation token support documentation. #23195

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 206 additions & 10 deletions docs/structured-streaming-kafka-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,26 +66,26 @@ Dataset<Row> df = spark
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Subscribe to multiple topics
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Subscribe to a pattern
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

{% endhighlight %}
</div>
Expand Down Expand Up @@ -479,15 +479,15 @@ StreamingQuery ds = df
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
.start();

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
StreamingQuery ds = df
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start()
.start();

{% endhighlight %}
</div>
Expand Down Expand Up @@ -547,14 +547,14 @@ df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()
.save();

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save()
.save();

{% endhighlight %}
</div>
Expand Down Expand Up @@ -624,3 +624,199 @@ For experimenting on `spark-shell`, you can also use `--packages` to add `spark-

See [Application Submission Guide](submitting-applications.html) for more details about submitting
applications with external dependencies.

## Security

Kafka 0.9.0.0 introduced several features that increases security in a cluster. For detailed
description about these possibilities, see [Kafka security docs](http://kafka.apache.org/documentation.html#security).

It's worth noting that security is optional and turned off by default.

Spark supports the following ways to authenticate against Kafka cluster:
- **Delegation token (introduced in Kafka broker 1.1.0)**
- **JAAS login configuration**

### Delegation token

This way the application can be configured via Spark parameters and may not need JAAS login
configuration (Spark can use Kafka's dynamic JAAS configuration feature). For further information
about delegation tokens, see [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token).

The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers`,
Spark considers the following log in options, in order of preference:
- **JAAS login configuration**
- **Keytab file**, such as,

./bin/spark-submit \
--keytab <KEYTAB_FILE> \
--principal <PRINCIPAL> \
--conf spark.kafka.bootstrap.servers=<KAFKA_SERVERS> \
...

- **Kerberos credential cache**, such as,

./bin/spark-submit \
--conf spark.kafka.bootstrap.servers=<KAFKA_SERVERS> \
...

The Kafka delegation token provider can be turned off by setting `spark.security.credentials.kafka.enabled` to `false` (default: `true`).

Spark can be configured to use the following authentication protocols to obtain token (it must match with
Kafka broker configuration):
- **SASL SSL (default)**
- **SSL**
- **SASL PLAINTEXT (for testing)**

After obtaining delegation token successfully, Spark distributes it across nodes and renews it accordingly.
Delegation token uses `SCRAM` login module for authentication and because of that the appropriate
`sasl.mechanism` has to be configured on source/sink (it must match with Kafka broker configuration):

<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}

// Setting on Kafka Source for Streaming Queries
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]

// Setting on Kafka Source for Batch Queries
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]

// Setting on Kafka Sink for Streaming Queries
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
.option("topic", "topic1")
.start()

// Setting on Kafka Sink for Batch Queries
val ds = df
.selectExpr("topic1", "CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
.save()

{% endhighlight %}
</div>
<div data-lang="java" markdown="1">
{% highlight java %}

// Setting on Kafka Source for Streaming Queries
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
.option("subscribe", "topic1")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Setting on Kafka Source for Batch Queries
Dataset<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
.option("subscribe", "topic1")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Setting on Kafka Sink for Streaming Queries
StreamingQuery ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
.option("topic", "topic1")
.start();

// Setting on Kafka Sink for Batch Queries
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
.option("topic", "topic1")
.save();

{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight python %}

// Setting on Kafka Source for Streaming Queries
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

// Setting on Kafka Source for Batch Queries
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

// Setting on Kafka Sink for Streaming Queries
ds = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
.option("topic", "topic1") \
.start()

// Setting on Kafka Sink for Batch Queries
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
.option("topic", "topic1") \
.save()

{% endhighlight %}
</div>
</div>

When delegation token is available on an executor it can be overridden with JAAS login configuration.

### JAAS login configuration

JAAS login configuration must placed on all nodes where Spark tries to access Kafka cluster.
This provides the possibility to apply any custom authentication logic with a higher cost to maintain.
This can be done several ways. One possibility is to provide additional JVM parameters, such as,

./bin/spark-submit \
--driver-java-options "-Djava.security.auth.login.config=/path/to/custom_jaas.conf" \
--conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/path/to/custom_jaas.conf \
...