Skip to content

Commit bafc7ac

Browse files
committed
[SPARK-26350][SS] Allow to override group id of the Kafka consumer
## What changes were proposed in this pull request? This PR allows the user to override `kafka.group.id` for better monitoring or security. The user needs to make sure there are not multiple queries or sources using the same group id. It also fixes a bug that the `groupIdPrefix` option cannot be retrieved. ## How was this patch tested? The new added unit tests. Closes apache#23301 from zsxwing/SPARK-26350. Authored-by: Shixiong Zhu <zsxwing@gmail.com> Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
1 parent 115fecf commit bafc7ac

File tree

8 files changed

+106
-16
lines changed

8 files changed

+106
-16
lines changed

docs/structured-streaming-kafka-integration.md

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,25 @@ The following configurations are optional:
379379
<td>string</td>
380380
<td>spark-kafka-source</td>
381381
<td>streaming and batch</td>
382-
<td>Prefix of consumer group identifiers (`group.id`) that are generated by structured streaming queries</td>
382+
<td>Prefix of consumer group identifiers (`group.id`) that are generated by structured streaming
383+
queries. If "kafka.group.id" is set, this option will be ignored. </td>
384+
</tr>
385+
<tr>
386+
<td>kafka.group.id</td>
387+
<td>string</td>
388+
<td>none</td>
389+
<td>streaming and batch</td>
390+
<td>The Kafka group id to use in Kafka consumer while reading from Kafka. Use this with caution.
391+
By default, each query generates a unique group id for reading data. This ensures that each Kafka
392+
source has its own consumer group that does not face interference from any other consumer, and
393+
therefore can read all of the partitions of its subscribed topics. In some scenarios (for example,
394+
Kafka group-based authorization), you may want to use a specific authorized group id to read data.
395+
You can optionally set the group id. However, do this with extreme caution as it can cause
396+
unexpected behavior. Concurrently running queries (both, batch and streaming) or sources with the
397+
same group id are likely interfere with each other causing each query to read only part of the
398+
data. This may also occur when queries are started/restarted in quick succession. To minimize such
399+
issues, set the Kafka consumer session timeout (by setting option "kafka.session.timeout.ms") to
400+
be very small. When this is set, option "groupIdPrefix" will be ignored. </td>
383401
</tr>
384402
</table>
385403

@@ -592,8 +610,9 @@ for parameters related to writing data.
592610
Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception:
593611

594612
- **group.id**: Kafka source will create a unique group id for each query automatically. The user can
595-
set the prefix of the automatically generated group.id's via the optional source option `groupIdPrefix`, default value
596-
is "spark-kafka-source".
613+
set the prefix of the automatically generated group.id's via the optional source option `groupIdPrefix`,
614+
default value is "spark-kafka-source". You can also set "kafka.group.id" to force Spark to use a special
615+
group id, however, please read warnings for this option and use it with caution.
597616
- **auto.offset.reset**: Set the source option `startingOffsets` to specify
598617
where to start instead. Structured Streaming manages which offsets are consumed internally, rather
599618
than rely on the kafka Consumer to do it. This will ensure that no data is missed when new

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010
2020
import java.{util => ju}
2121
import java.util.concurrent.TimeoutException
2222

23-
import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetOutOfRangeException}
23+
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, OffsetOutOfRangeException}
2424
import org.apache.kafka.common.TopicPartition
2525

2626
import org.apache.spark.TaskContext
@@ -167,7 +167,13 @@ class KafkaContinuousScanConfigBuilder(
167167

168168
val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet)
169169
if (deletedPartitions.nonEmpty) {
170-
reportDataLoss(s"Some partitions were deleted: $deletedPartitions")
170+
val message = if (
171+
offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
172+
s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}"
173+
} else {
174+
s"$deletedPartitions are gone. Some data may have been missed."
175+
}
176+
reportDataLoss(message)
171177
}
172178

173179
val startOffsets = newPartitionOffsets ++

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.io._
2222
import java.nio.charset.StandardCharsets
2323

2424
import org.apache.commons.io.IOUtils
25+
import org.apache.kafka.clients.consumer.ConsumerConfig
2526

2627
import org.apache.spark.SparkEnv
2728
import org.apache.spark.internal.Logging
@@ -122,7 +123,13 @@ private[kafka010] class KafkaMicroBatchReadSupport(
122123
// Find deleted partitions, and report data loss if required
123124
val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
124125
if (deletedPartitions.nonEmpty) {
125-
reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
126+
val message =
127+
if (kafkaOffsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
128+
s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}"
129+
} else {
130+
s"$deletedPartitions are gone. Some data may have been missed."
131+
}
132+
reportDataLoss(message)
126133
}
127134

128135
// Use the end partitions to calculate offset ranges to ignore partitions that have

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
4646
*/
4747
private[kafka010] class KafkaOffsetReader(
4848
consumerStrategy: ConsumerStrategy,
49-
driverKafkaParams: ju.Map[String, Object],
49+
val driverKafkaParams: ju.Map[String, Object],
5050
readerOptions: Map[String, String],
5151
driverGroupIdPrefix: String) extends Logging {
5252
/**
@@ -82,7 +82,9 @@ private[kafka010] class KafkaOffsetReader(
8282
assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
8383
if (_consumer == null) {
8484
val newKafkaParams = new ju.HashMap[String, Object](driverKafkaParams)
85-
newKafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, nextGroupId())
85+
if (driverKafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG) == null) {
86+
newKafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, nextGroupId())
87+
}
8688
_consumer = consumerStrategy.createConsumer(newKafkaParams)
8789
}
8890
_consumer

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.io._
2222
import java.nio.charset.StandardCharsets
2323

2424
import org.apache.commons.io.IOUtils
25+
import org.apache.kafka.clients.consumer.ConsumerConfig
2526
import org.apache.kafka.common.TopicPartition
2627

2728
import org.apache.spark.SparkContext
@@ -242,7 +243,12 @@ private[kafka010] class KafkaSource(
242243

243244
val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet)
244245
if (deletedPartitions.nonEmpty) {
245-
reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
246+
val message = if (kafkaReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
247+
s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}"
248+
} else {
249+
s"$deletedPartitions are gone. Some data may have been missed."
250+
}
251+
reportDataLoss(message)
246252
}
247253

248254
// Use the until partitions to calculate offset ranges to ignore partitions that have

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -335,9 +335,11 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
335335
// Validate user-specified Kafka options
336336

337337
if (caseInsensitiveParams.contains(s"kafka.${ConsumerConfig.GROUP_ID_CONFIG}")) {
338-
throw new IllegalArgumentException(
339-
s"Kafka option '${ConsumerConfig.GROUP_ID_CONFIG}' is not supported as " +
340-
s"user-specified consumer groups are not used to track offsets.")
338+
logWarning(CUSTOM_GROUP_ID_ERROR_MESSAGE)
339+
if (caseInsensitiveParams.contains(GROUP_ID_PREFIX)) {
340+
logWarning("Option 'groupIdPrefix' will be ignored as " +
341+
s"option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}' has been set.")
342+
}
341343
}
342344

343345
if (caseInsensitiveParams.contains(s"kafka.${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG}")) {
@@ -440,6 +442,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
440442
private[kafka010] val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
441443
private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
442444
private val MIN_PARTITIONS_OPTION_KEY = "minpartitions"
445+
private val GROUP_ID_PREFIX = "groupidprefix"
443446

444447
val TOPIC_OPTION_KEY = "topic"
445448

@@ -459,7 +462,16 @@ private[kafka010] object KafkaSourceProvider extends Logging {
459462
| source option "failOnDataLoss" to "false".
460463
""".stripMargin
461464

462-
465+
val CUSTOM_GROUP_ID_ERROR_MESSAGE =
466+
s"""Kafka option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}' has been set on this query, it is
467+
| not recommended to set this option. This option is unsafe to use since multiple concurrent
468+
| queries or sources using the same group id will interfere with each other as they are part
469+
| of the same consumer group. Restarted queries may also suffer interference from the
470+
| previous run having the same group id. The user should have only one query per group id,
471+
| and/or set the option 'kafka.session.timeout.ms' to be very small so that the Kafka
472+
| consumers from the previous query are marked dead by the Kafka group coordinator before the
473+
| restarted query starts running.
474+
""".stripMargin
463475

464476
private val serClassName = classOf[ByteArraySerializer].getName
465477
private val deserClassName = classOf[ByteArrayDeserializer].getName
@@ -510,7 +522,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
510522
.set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
511523

512524
// So that consumers in executors do not mess with any existing group id
513-
.set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-executor")
525+
.setIfUnset(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-executor")
514526

515527
// So that consumers in executors does not commit offsets unnecessarily
516528
.set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
@@ -529,7 +541,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
529541
parameters: Map[String, String],
530542
metadataPath: String): String = {
531543
val groupIdPrefix = parameters
532-
.getOrElse("groupIdPrefix", "spark-kafka-source")
544+
.getOrElse(GROUP_ID_PREFIX, "spark-kafka-source")
533545
s"${groupIdPrefix}-${UUID.randomUUID}-${metadataPath.hashCode}"
534546
}
535547

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,33 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
629629
)
630630
}
631631

632+
test("allow group.id override") {
633+
// Tests code path KafkaSourceProvider.{sourceSchema(.), createSource(.)}
634+
// as well as KafkaOffsetReader.createConsumer(.)
635+
val topic = newTopic()
636+
testUtils.createTopic(topic, partitions = 3)
637+
testUtils.sendMessages(topic, (1 to 10).map(_.toString).toArray, Some(0))
638+
testUtils.sendMessages(topic, (11 to 20).map(_.toString).toArray, Some(1))
639+
testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray, Some(2))
640+
641+
val dsKafka = spark
642+
.readStream
643+
.format("kafka")
644+
.option("kafka.group.id", "id-" + Random.nextInt())
645+
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
646+
.option("subscribe", topic)
647+
.option("startingOffsets", "earliest")
648+
.load()
649+
.selectExpr("CAST(value AS STRING)")
650+
.as[String]
651+
.map(_.toInt)
652+
653+
testStream(dsKafka)(
654+
makeSureGetOffsetCalled,
655+
CheckAnswer(1 to 30: _*)
656+
)
657+
}
658+
632659
test("ensure stream-stream self-join generates only one offset in log and correct metrics") {
633660
val topic = newTopic()
634661
testUtils.createTopic(topic, partitions = 2)
@@ -1233,7 +1260,6 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
12331260
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("not supported"))
12341261
}
12351262

1236-
testUnsupportedConfig("kafka.group.id")
12371263
testUnsupportedConfig("kafka.auto.offset.reset")
12381264
testUnsupportedConfig("kafka.enable.auto.commit")
12391265
testUnsupportedConfig("kafka.interceptor.classes")

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,18 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest
239239
testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty")
240240
}
241241

242+
test("allow group.id overriding") {
243+
// Tests code path KafkaSourceProvider.createRelation(.)
244+
val topic = newTopic()
245+
testUtils.createTopic(topic, partitions = 3)
246+
testUtils.sendMessages(topic, (1 to 10).map(_.toString).toArray, Some(0))
247+
testUtils.sendMessages(topic, (11 to 20).map(_.toString).toArray, Some(1))
248+
testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray, Some(2))
249+
250+
val df = createDF(topic, withOptions = Map("kafka.group.id" -> "custom"))
251+
checkAnswer(df, (1 to 30).map(_.toString).toDF())
252+
}
253+
242254
test("read Kafka transactional messages: read_committed") {
243255
val topic = newTopic()
244256
testUtils.createTopic(topic)

0 commit comments

Comments
 (0)