Skip to content

Latest commit

 

History

History
98 lines (80 loc) · 4.78 KB

recipe-5-seeking-to-arbitrary-offsets-kafka.adoc

File metadata and controls

98 lines (80 loc) · 4.78 KB

Seeking to arbitrary offsets in Kafka

Problem Statement

Using Kafka binder, I know that it can set the offset to either earliest or latest, but I have a requirement to seek the offset to something in the middle, an arbitrary offset. Is there a way to achieve this using Spring Cloud Stream Kafka biner?

Solution

Previously we saw how Kafka binder allows you to tackle basic offset management. By default, the binder does not allow you to rewind to an arbitrary offset, at least through the mechanism we saw in that reipce. However, there are some low-level strategies that the binder provides to achieve this use case. Let’s explore them.

First of all, when you want to reset to an arbitrary offset other than earliest or latest, make sure to leave the resetOffsets configuration to its defaults, which is false. Then you have to provide a custom bean of type KafkaBindingRebalanceListener, which will be injected into all consumer bindings. It is an interface that comes with a few default methods, but here is the method that we are interested in:

/**
	 * Invoked when partitions are initially assigned or after a rebalance. Applications
	 * might only want to perform seek operations on an initial assignment. While the
	 * 'initial' argument is true for each thread (when concurrency is greater than 1),
	 * implementations should keep track of exactly which partitions have been sought.
	 * There is a race in that a rebalance could occur during startup and so a topic/
	 * partition that has been sought on one thread may be re-assigned to another
	 * thread and you may not wish to re-seek it at that time.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 * @param initial true if this is the initial assignment on the current thread.
	 */
	default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
			Collection<TopicPartition> partitions, boolean initial) {
		// do nothing
	}

Let us look at the details.

In essence, this method will be invoked each time during the initial assignment for a topic partition or after a rebalance. For better illustration, let us assume that our topic is foo and it has 4 partitions. Initially, we are only starting a single consumer in the group and this consumer will consume from all partitions. When the consumer starts for the first time, all 4 partitions are getting initially assigned. However, we do not want to start the partitions to consume at the defaults (earliest since we define a group), rather for each partition, we want them to consume after seeking to arbitrary offsets. Imagine that you have a business case to consume from certain offsets as below.

Partition   start offset

0           1000
1           2000
2           2000
3           1000

This could be achieved by implementing the above method as below.

@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {

    Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
    topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
    topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);

    if (initial) {
        partitions.forEach(tp -> {
            if (topicPartitionOffset.containsKey(tp)) {
                final Long offset = topicPartitionOffset.get(tp);
                try {
                    consumer.seek(tp, offset);
                }
                catch (Exception e) {
                    // Handle excpetions carefully.
                }
            }
        });
    }
}

This is just a rudimentary implementation. Real world use cases are much more complex than this and you need to adjust accordingly, but this certainly gives you a basic sketch. When consumer seek fails, it may throw some runtime exceptions and you need to decide what to do in those cases.

What if we start a second consumer with the same group id?

When we add a second consumer, a rebalance will occur and some partitions will be moved around. Let’s say that the new consumer gets partitions 2 and 3. When this new Spring Cloud Stream consumer calls this onPartitionsAssigned method, it will see that this is the initial assignment for partititon 2 and 3 on this consumer. Therefore, it will do the seek operation becuase of the conditional check on the initial argument. In the case of the first consumer, it now only has partitons 0 and 1 However, for this consumer it was simply a rebalance event and not considered as an intial assignment. Thus, it will not re-seek to the given offsets because of the conditional check on the initial argument.