Skip to content

Latest commit

 

History

History
114 lines (90 loc) · 3.85 KB

recipe-12-custom-kafka-header-mapper.adoc

File metadata and controls

114 lines (90 loc) · 3.85 KB

Adding custom header mapper in Kafka

Problem Statement

I have a Kafka producer application that sets some headers, but they are missing in the consumer application. Why is that?

Solution

Under normal circumstances, this should be fine.

Imagine, you have the following producer.

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}

On the consumer side, you should still see the header "foo", and the following should not give you any issues.

@Bean
public Consumer<Message<String>> consume() {
    return s -> {
        final String foo = (String)s.getHeaders().get("foo");
        System.out.println(foo);
    };
}

If you provide a custom header mapper in the application, then this won’t work. Let’s say you have an empty KafkaHeaderMapper in the application.

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {

        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {

        }
    };
}

If that is your implementation, then you will miss the foo header on the consumer. Chances are that, you may have some logic inside those KafkaHeaderMapper methods. You need the following to populate the foo header.

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String foo = (String) headers.get("foo");
            target.add("foo", foo.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header foo = source.lastHeader("foo");
			target.put("foo", new String(foo.value()));
        }
    }

That will properly populate the foo header from the producer to consumer.

Special note on the id header

In Spring Cloud Stream, the id header is a special header, but some applications may want to have special custom id headers - something like custom-id or ID or Id. The first one (custom-id) will propagate without any custom header mapper from producer to consumer. However, if you produce with a variant of the framework reserved id header - such as ID, Id, iD etc. then you will run into issues with the internals of the framework. See this StackOverflow thread fore more context on this use case: https://stackoverflow.com/questions/68412600/change-the-behaviour-in-spring-cloud-stream-make-header-matcher-case-sensitive In that case, you must use a custom KafkaHeaderMapper to map the case-sensitive id header. For example, let’s say you have the following producer.

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}

The header Id above will be gone from the consuming side as it clashes with the framework id header. You can provide a custom KafkaHeaderMapper to solve this issue.

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String myId = (String) headers.get("Id");
			target.add("Id", myId.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header Id = source.lastHeader("Id");
			target.put("Id", new String(Id.value()));
        }
    };
}

By doing this, both id and Id headers will be available from the producer to the consumer side.