-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Description
Setting the group.id in the .updateConsumerProperties() still makes the reader to start reading at offset 0 for all jobs.
The setup
Map<String, Object> props = new HashMap<>();
props.put("group.id", "dataflow-reader");
props.put("auto.offset.reset", "earliest");
PCollection<KafkaRecord<String, String>> pcol = p.apply(KafkaIO.<String, String>read()
.withBootstrapServers(options.getBootstrapServers())
.withTopics(topics)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withNumSplits(1)
.updateConsumerProperties(props));
when I start a new job this is logged in the console.
Reader-0: reading from name-of-topic-0 starting at offset 0
ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [xxxx]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = Reader-0_offset_consumer_778069295_dataflow-reader
And it looks like that happens here
DataflowTemplates/src/main/java/com/google/cloud/teleport/kafka/connector/KafkaUnboundedReader.java
Line 146 in 4788104
| String offsetGroupId = |
Is it possible to make the reader not start from offset 0 for each new dataflow job instance?