Skip to content

Use same group.id in consumer properties #23

@jocke-ek

Description

@jocke-ek

Setting the group.id in the .updateConsumerProperties() still makes the reader to start reading at offset 0 for all jobs.

The setup

        Map<String, Object> props = new HashMap<>();
        props.put("group.id", "dataflow-reader");
        props.put("auto.offset.reset", "earliest");

        PCollection<KafkaRecord<String, String>> pcol = p.apply(KafkaIO.<String, String>read()
            .withBootstrapServers(options.getBootstrapServers())
            .withTopics(topics)
            .withKeyDeserializer(StringDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)
            .withNumSplits(1)
            .updateConsumerProperties(props));

when I start a new job this is logged in the console.

Reader-0: reading from name-of-topic-0 starting at offset 0
ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [xxxx]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = Reader-0_offset_consumer_778069295_dataflow-reader

And it looks like that happens here

Is it possible to make the reader not start from offset 0 for each new dataflow job instance?

Metadata

Metadata

Assignees

No one assigned

    Labels

    additionNew feature or requestblockedBlocked by new features from Cloud Dataflow

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions