Description
In what version(s) of Spring Integration are you seeing this issue?
Encountered in 6.4.1, seems to be introduced in 6.3.0
Describe the bug
When updating my application from SI 6.2.x to 6.4.1 I found out that whereas the Kafka inbound channel adapter previously produced messages with the MessageHeaders.ID
and the MessageHeaders.TIMESTAMP
headers, it stopped doing so.
On investigation, the changed behavior comes from GH-7925, which is why I'm posting the issue here. With this PR, MessageHistory#write(..) no longer defaults to execute message = messageBuilderFactory.fromMessage(message).setHeader(HEADER_NAME, history).build();
, that would trigger default behavior to add the headers.
The root cause may be deeper in either Spring Framework or Spring Kafka: on conversion from Kafka Message to Spring Message, it is constructed by Spring Kafka eventually through MessagingMessageConverter#toMessage(..) that subsequently calls MessageBuilder.createMessage(..). This method doesn't do a .build()
that would normally trigger the creation of the headersToUse
that do include the MessageHeaders.ID
and MessageHeaders.TIMESTAMP
headers.
To Reproduce
In Spring Integration test MessageSourceTests#testAckCommon()
, add 2 assertions with the header assertions that start in line 295:
assertThat(received.getHeaders().get(MessageHeaders.TIMESTAMP)).isNotNull();
assertThat(received.getHeaders().get(MessageHeaders.ID)).isNotNull();
This should result in test cases testAckAsyncCommits
, testAckSyncCommits
and testAckSyncCommitsTimeout
to fail.
Expected behavior
I'd expect inbounds to consistently produce SI messages with ID and timestamp header. I tested some other inbound (JMS), that does not seem to have this issue.
Tested JMS by adjusting JmsInboundChannelAdapterTests#testTransactionalReceive()
to:
JmsTemplate template = new JmsTemplate(connectionFactory);
template.convertAndSend("incatQ", "bar");
Message<?> message = out.receive(20000);
assertThat(message).isNotNull();
assertThat(message.getHeaders()).containsKeys(MessageHeaders.ID, MessageHeaders.TIMESTAMP);