I have a Kafka producer application that sets some headers, but they are missing in the consumer application. Why is that?
Under normal circumstances, this should be fine.
Imagine, you have the following producer.
@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}
On the consumer side, you should still see the header "foo", and the following should not give you any issues.
@Bean
public Consumer<Message<String>> consume() {
return s -> {
final String foo = (String)s.getHeaders().get("foo");
System.out.println(foo);
};
}
If you provide a custom header mapper in the application, then this won’t work.
Let’s say you have an empty KafkaHeaderMapper
in the application.
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
}
};
}
If that is your implementation, then you will miss the foo
header on the consumer.
Chances are that, you may have some logic inside those KafkaHeaderMapper
methods.
You need the following to populate the foo
header.
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String foo = (String) headers.get("foo");
target.add("foo", foo.getBytes());
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header foo = source.lastHeader("foo");
target.put("foo", new String(foo.value()));
}
}
That will properly populate the foo
header from the producer to consumer.
In Spring Cloud Stream, the id
header is a special header, but some applications may want to have special custom id headers - something like custom-id
or ID
or Id
.
The first one (custom-id
) will propagate without any custom header mapper from producer to consumer.
However, if you produce with a variant of the framework reserved id
header - such as ID
, Id
, iD
etc. then you will run into issues with the internals of the framework.
See this StackOverflow thread fore more context on this use case: https://stackoverflow.com/questions/68412600/change-the-behaviour-in-spring-cloud-stream-make-header-matcher-case-sensitive
In that case, you must use a custom KafkaHeaderMapper
to map the case-sensitive id header.
For example, let’s say you have the following producer.
@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}
The header Id
above will be gone from the consuming side as it clashes with the framework id
header.
You can provide a custom KafkaHeaderMapper
to solve this issue.
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String myId = (String) headers.get("Id");
target.add("Id", myId.getBytes());
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header Id = source.lastHeader("Id");
target.put("Id", new String(Id.value()));
}
};
}
By doing this, both id
and Id
headers will be available from the producer to the consumer side.