Skip to content

Kafka inbound channel adapter no longer adds 'id' and 'timestamp' headers #9801

Closed
@smitsjelle

Description

@smitsjelle

In what version(s) of Spring Integration are you seeing this issue?
Encountered in 6.4.1, seems to be introduced in 6.3.0

Describe the bug

When updating my application from SI 6.2.x to 6.4.1 I found out that whereas the Kafka inbound channel adapter previously produced messages with the MessageHeaders.ID and the MessageHeaders.TIMESTAMP headers, it stopped doing so.

On investigation, the changed behavior comes from GH-7925, which is why I'm posting the issue here. With this PR, MessageHistory#write(..) no longer defaults to execute message = messageBuilderFactory.fromMessage(message).setHeader(HEADER_NAME, history).build();, that would trigger default behavior to add the headers.

The root cause may be deeper in either Spring Framework or Spring Kafka: on conversion from Kafka Message to Spring Message, it is constructed by Spring Kafka eventually through MessagingMessageConverter#toMessage(..) that subsequently calls MessageBuilder.createMessage(..). This method doesn't do a .build() that would normally trigger the creation of the headersToUse that do include the MessageHeaders.ID and MessageHeaders.TIMESTAMP headers.

To Reproduce

In Spring Integration test MessageSourceTests#testAckCommon(), add 2 assertions with the header assertions that start in line 295:

assertThat(received.getHeaders().get(MessageHeaders.TIMESTAMP)).isNotNull();
assertThat(received.getHeaders().get(MessageHeaders.ID)).isNotNull();

This should result in test cases testAckAsyncCommits, testAckSyncCommits and testAckSyncCommitsTimeout to fail.

Expected behavior

I'd expect inbounds to consistently produce SI messages with ID and timestamp header. I tested some other inbound (JMS), that does not seem to have this issue.
Tested JMS by adjusting JmsInboundChannelAdapterTests#testTransactionalReceive() to:

JmsTemplate template = new JmsTemplate(connectionFactory);
template.convertAndSend("incatQ", "bar");
Message<?> message = out.receive(20000);
assertThat(message).isNotNull();
assertThat(message.getHeaders()).containsKeys(MessageHeaders.ID, MessageHeaders.TIMESTAMP);

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions