Skip to content

Commit fccfee1

Browse files
committed
allow to override group id
1 parent 570b8f3 commit fccfee1

File tree

8 files changed

+128
-14
lines changed

8 files changed

+128
-14
lines changed

docs/structured-streaming-kafka-integration.md

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,25 @@ The following configurations are optional:
379379
<td>string</td>
380380
<td>spark-kafka-source</td>
381381
<td>streaming and batch</td>
382-
<td>Prefix of consumer group identifiers (`group.id`) that are generated by structured streaming queries</td>
382+
<td>Prefix of consumer group identifiers (`group.id`) that are generated by structured streaming
383+
queries. If "kafka.group.id" is set, this option will be ignored. </td>
384+
</tr>
385+
<tr>
386+
<td>kafka.group.id</td>
387+
<td>string</td>
388+
<td>none</td>
389+
<td>streaming and batch</td>
390+
<td>The Kafka group id to use in Kafka consumer while reading from Kafka. Use this with caution.
391+
By default, each query generates a unique group id for reading data. This ensures that each Kafka
392+
source has its own consumer group that does not face interference from any other consumer, and
393+
therefore can read all of the partitions of its subscribed topics. In some scenarios (for example,
394+
Kafka group-based authorization), you may want to use a specific authorized group id to read data.
395+
You can optionally set the group ID. However, do this with extreme caution as it can cause
396+
unexpected behavior. Concurrently running queries (both, batch and streaming) or sources with the
397+
same group id are likely interfere with each other causing each query to read only part of the
398+
data. This may also occur when queries are started/restarted in quick succession. To minimize such
399+
issues, set the Kafka consumer session timeout (by setting option "kafka.session.timeout.ms") to
400+
be very small. When this is set, option "groupIdPrefix" will be ignored. </td>
383401
</tr>
384402
</table>
385403

@@ -592,8 +610,9 @@ for parameters related to writing data.
592610
Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception:
593611

594612
- **group.id**: Kafka source will create a unique group id for each query automatically. The user can
595-
set the prefix of the automatically generated group.id's via the optional source option `groupIdPrefix`, default value
596-
is "spark-kafka-source".
613+
set the prefix of the automatically generated group.id's via the optional source option `groupIdPrefix`,
614+
default value is "spark-kafka-source". You can also set "kafka.group.id" to force Spark to use a special
615+
group id, however, please read warnings for this option and use it with caution.
597616
- **auto.offset.reset**: Set the source option `startingOffsets` to specify
598617
where to start instead. Structured Streaming manages which offsets are consumed internally, rather
599618
than rely on the kafka Consumer to do it. This will ensure that no data is missed when new

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010
2020
import java.{util => ju}
2121
import java.util.concurrent.TimeoutException
2222

23-
import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetOutOfRangeException}
23+
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, OffsetOutOfRangeException}
2424
import org.apache.kafka.common.TopicPartition
2525

2626
import org.apache.spark.TaskContext
@@ -167,7 +167,21 @@ class KafkaContinuousScanConfigBuilder(
167167

168168
val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet)
169169
if (deletedPartitions.nonEmpty) {
170-
reportDataLoss(s"Some partitions were deleted: $deletedPartitions")
170+
val message = if (
171+
offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
172+
s"$deletedPartitions are gone. Kafka option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}' " +
173+
"has been set on this query, it is not recommended to set this option. This option is " +
174+
"unsafe to use since multiple concurrent queries or sources using the same group id " +
175+
"will interfere with each other as they are part of the same consumer group. Restarted " +
176+
"queries may also suffer interference from the previous run having the same group id. " +
177+
"The user should have only one query per group id, and/or set " +
178+
"'kafka.session.timeout.ms' to be very small so that the Kafka consumers from the " +
179+
"previous query are marked dead by the Kafka group coordinator before the restarted " +
180+
"query starts running."
181+
} else {
182+
s"$deletedPartitions are gone. Some data may have been missed"
183+
}
184+
reportDataLoss(message)
171185
}
172186

173187
val startOffsets = newPartitionOffsets ++

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.io._
2222
import java.nio.charset.StandardCharsets
2323

2424
import org.apache.commons.io.IOUtils
25+
import org.apache.kafka.clients.consumer.ConsumerConfig
2526

2627
import org.apache.spark.SparkEnv
2728
import org.apache.spark.internal.Logging
@@ -122,7 +123,21 @@ private[kafka010] class KafkaMicroBatchReadSupport(
122123
// Find deleted partitions, and report data loss if required
123124
val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
124125
if (deletedPartitions.nonEmpty) {
125-
reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
126+
val message =
127+
if (kafkaOffsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
128+
s"$deletedPartitions are gone. Kafka option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}' " +
129+
"has been set on this query, it is not recommended to set this option. This option is " +
130+
"unsafe to use since multiple concurrent queries or sources using the same group id " +
131+
"will interfere with each other as they are part of the same consumer group. Restarted " +
132+
"queries may also suffer interference from the previous run having the same group id. " +
133+
"The user should have only one query per group id, and/or set " +
134+
"'kafka.session.timeout.ms' to be very small so that the Kafka consumers from the " +
135+
"previous query are marked dead by the Kafka group coordinator before the restarted " +
136+
"query starts running."
137+
} else {
138+
s"$deletedPartitions are gone. Some data may have been missed"
139+
}
140+
reportDataLoss(message)
126141
}
127142

128143
// Use the end partitions to calculate offset ranges to ignore partitions that have

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
4545
*/
4646
private[kafka010] class KafkaOffsetReader(
4747
consumerStrategy: ConsumerStrategy,
48-
driverKafkaParams: ju.Map[String, Object],
48+
val driverKafkaParams: ju.Map[String, Object],
4949
readerOptions: Map[String, String],
5050
driverGroupIdPrefix: String) extends Logging {
5151
/**
@@ -81,7 +81,9 @@ private[kafka010] class KafkaOffsetReader(
8181
assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
8282
if (_consumer == null) {
8383
val newKafkaParams = new ju.HashMap[String, Object](driverKafkaParams)
84-
newKafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, nextGroupId())
84+
if (driverKafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG) == null) {
85+
newKafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, nextGroupId())
86+
}
8587
_consumer = consumerStrategy.createConsumer(newKafkaParams)
8688
}
8789
_consumer

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.io._
2222
import java.nio.charset.StandardCharsets
2323

2424
import org.apache.commons.io.IOUtils
25+
import org.apache.kafka.clients.consumer.ConsumerConfig
2526
import org.apache.kafka.common.TopicPartition
2627

2728
import org.apache.spark.SparkContext
@@ -241,7 +242,20 @@ private[kafka010] class KafkaSource(
241242

242243
val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet)
243244
if (deletedPartitions.nonEmpty) {
244-
reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
245+
val message = if (kafkaReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
246+
s"$deletedPartitions are gone. Kafka option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}' " +
247+
"has been set on this query, it is not recommended to set this option. This option is " +
248+
"unsafe to use since multiple concurrent queries or sources using the same group id " +
249+
"will interfere with each other as they are part of the same consumer group. Restarted " +
250+
"queries may also suffer interference from the previous run having the same group id. " +
251+
"The user should have only one query per group id, and/or set " +
252+
"'kafka.session.timeout.ms' to be very small so that the Kafka consumers from the " +
253+
"previous query are marked dead by the Kafka group coordinator before the restarted " +
254+
"query starts running."
255+
} else {
256+
s"$deletedPartitions are gone. Some data may have been missed"
257+
}
258+
reportDataLoss(message)
245259
}
246260

247261
// Use the until partitions to calculate offset ranges to ignore partitions that have

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -340,9 +340,19 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
340340
// Validate user-specified Kafka options
341341

342342
if (caseInsensitiveParams.contains(s"kafka.${ConsumerConfig.GROUP_ID_CONFIG}")) {
343-
throw new IllegalArgumentException(
344-
s"Kafka option '${ConsumerConfig.GROUP_ID_CONFIG}' is not supported as " +
345-
s"user-specified consumer groups are not used to track offsets.")
343+
logWarning(
344+
s"It is not recommended to set Kafka option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}'. " +
345+
"This option is unsafe to use since multiple concurrent queries or sources using the " +
346+
"same group id will interfere with each other as they are part of the same consumer " +
347+
"group. Restarted queries may also suffer interference from the previous run having the " +
348+
"same group id. The user should have only one query per group id, and/or set " +
349+
"'kafka.session.timeout.ms' to be very small so that the Kafka consumers from the " +
350+
"previous query are marked dead by the Kafka group coordinator before the restarted " +
351+
"query starts running.")
352+
if (caseInsensitiveParams.contains(GROUP_ID_PREFIX)) {
353+
logWarning("Option 'groupIdPrefix' will be ignored as " +
354+
s"option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}' has been set.")
355+
}
346356
}
347357

348358
if (caseInsensitiveParams.contains(s"kafka.${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG}")) {
@@ -445,6 +455,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
445455
private[kafka010] val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
446456
private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
447457
private val MIN_PARTITIONS_OPTION_KEY = "minpartitions"
458+
private val GROUP_ID_PREFIX = "groupidprefix"
448459

449460
val TOPIC_OPTION_KEY = "topic"
450461

@@ -515,7 +526,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
515526
.set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
516527

517528
// So that consumers in executors do not mess with any existing group id
518-
.set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-executor")
529+
.setIfUnset(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-executor")
519530

520531
// So that consumers in executors does not commit offsets unnecessarily
521532
.set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
@@ -534,7 +545,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
534545
parameters: Map[String, String],
535546
metadataPath: String): String = {
536547
val groupIdPrefix = parameters
537-
.getOrElse("groupIdPrefix", "spark-kafka-source")
548+
.getOrElse(GROUP_ID_PREFIX, "spark-kafka-source")
538549
s"${groupIdPrefix}-${UUID.randomUUID}-${metadataPath.hashCode}"
539550
}
540551

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -581,6 +581,33 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
581581
)
582582
}
583583

584+
test("allow group.id override") {
585+
// Tests code path KafkaSourceProvider.{sourceSchema(.), createSource(.)}
586+
// as well as KafkaOffsetReader.createConsumer(.)
587+
val topic = newTopic()
588+
testUtils.createTopic(topic, partitions = 3)
589+
testUtils.sendMessages(topic, (1 to 10).map(_.toString).toArray, Some(0))
590+
testUtils.sendMessages(topic, (11 to 20).map(_.toString).toArray, Some(1))
591+
testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray, Some(2))
592+
593+
val dsKafka = spark
594+
.readStream
595+
.format("kafka")
596+
.option("kafka.group.id", "id-" + Random.nextInt())
597+
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
598+
.option("subscribe", topic)
599+
.option("startingOffsets", "earliest")
600+
.load()
601+
.selectExpr("CAST(value AS STRING)")
602+
.as[String]
603+
.map(_.toInt)
604+
605+
testStream(dsKafka)(
606+
makeSureGetOffsetCalled,
607+
CheckAnswer(1 to 30: _*)
608+
)
609+
}
610+
584611
test("ensure stream-stream self-join generates only one offset in log and correct metrics") {
585612
val topic = newTopic()
586613
testUtils.createTopic(topic, partitions = 2)

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,18 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest
239239
testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty")
240240
}
241241

242+
test("allow group.id overriding") {
243+
// Tests code path KafkaSourceProvider.createRelation(.)
244+
val topic = newTopic()
245+
testUtils.createTopic(topic, partitions = 3)
246+
testUtils.sendMessages(topic, (1 to 10).map(_.toString).toArray, Some(0))
247+
testUtils.sendMessages(topic, (11 to 20).map(_.toString).toArray, Some(1))
248+
testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray, Some(2))
249+
250+
val df = createDF(topic, withOptions = Map("kafka.group.id" -> "custom"))
251+
checkAnswer(df, (1 to 30).map(_.toString).toDF())
252+
}
253+
242254
test("read Kafka transactional messages: read_committed") {
243255
val topic = newTopic()
244256
testUtils.createTopic(topic)

0 commit comments

Comments
 (0)