Skip to content

Latest commit

 

History

History
33 lines (23 loc) · 2.39 KB

recipe-4-basic-offset-management-kafka-binder.adoc

File metadata and controls

33 lines (23 loc) · 2.39 KB

Basic offset management in Kafka binder

Problem Statement

I want to write a Spring Cloud Stream Kafka consumer applicaiton and not sure about how it manages Kafka consumer offsets. Can you exaplain?

Solution

Here is it in a gist:

Kafka supports two types of offsets to start with by default - earliest and latest. Their semantics are self-explanatory from their names.

Assuming you are running the consumer for the first time. If you miss the group.id in your Spring Cloud Stream application, then it becomes an anonymous consumer. Whenever, you have an anonymous consumer, in that case, Spring Cloud Stream application by default will start from the latest available offset in the topic partition. On the other hand, if you explicitly specify a group.id, then by default, the Spring Cloud Stream application will start from the earliest available offset in the topic partiton.

In both cases above (consumers with explicit groups and anonymous groups), the starting offset can be switched around by using the property spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset and setting it to either earliest or latest.

Now, assume that you already ran the consumer before and now starting it again. In this case, the starting offset semantics in the above case do not apply as the consumer finds an already committed offset for the consumer group (In the case of an anonymous consumer, although the application does not provide a group.id, the binder will auto generate one for you). It simply picks up from the last committed offset onward. This is true, even when you have a startOffset value provided.

However, you can override the default behavior where the consumer starts from the last committed offset by using the resetOffsets property. In order to do that, set the property spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets to true (which is false by default). Then make sure you provide the startOffset value (either earliest or latest). When you do that and then start the consumer application, each time you start, it starts as if this is starting for the first time and ignore any committed offsets for the partition.