I want to write a Spring Cloud Stream Kafka consumer applicaiton and not sure about how it manages Kafka consumer offsets. Can you exaplain?
We encourage you read the docs section on this to get a thorough understanding on it. https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.3-SNAPSHOT/reference/html/spring-cloud-stream-binder-kafka.html#reset-offsets
Here is it in a gist:
Kafka supports two types of offsets to start with by default - earliest
and latest
.
Their semantics are self-explanatory from their names.
Assuming you are running the consumer for the first time.
If you miss the group.id in your Spring Cloud Stream application, then it becomes an anonymous consumer.
Whenever, you have an anonymous consumer, in that case, Spring Cloud Stream application by default will start from the latest
available offset in the topic partition.
On the other hand, if you explicitly specify a group.id, then by default, the Spring Cloud Stream application will start from the earliest
available offset in the topic partiton.
In both cases above (consumers with explicit groups and anonymous groups), the starting offset can be switched around by using the property spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset
and setting it to either earliest
or latest
.
Now, assume that you already ran the consumer before and now starting it again.
In this case, the starting offset semantics in the above case do not apply as the consumer finds an already committed offset for the consumer group (In the case of an anonymous consumer, although the application does not provide a group.id, the binder will auto generate one for you).
It simply picks up from the last committed offset onward.
This is true, even when you have a startOffset
value provided.
However, you can override the default behavior where the consumer starts from the last committed offset by using the resetOffsets
property.
In order to do that, set the property spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets
to true
(which is false
by default).
Then make sure you provide the startOffset
value (either earliest
or latest
).
When you do that and then start the consumer application, each time you start, it starts as if this is starting for the first time and ignore any committed offsets for the partition.