Description
Discussed in #8629
Originally posted by Antxl May 21, 2023
I'm making a custom cloud-stream-binder for a jms middleware, and I'm using JmsMessageDrivenEndpoint
as MessageProducer
.
It requires a ChannelPublishingJmsMessageListener
where I had difficulties in applying the RetryTemplate
built by AbstractBinder
to that I can make maxAttempts
property take effect.
Let's have a look at method public void onMessage(javax.jms.Message jmsMessage, Session session)
in it.
Message<?> requestMessage;
try {
final Object result;
if (this.extractRequestPayload) {
result = this.messageConverter.fromMessage(jmsMessage);
this.logger.debug(...);
}
else {
result = jmsMessage;
}
Map<String, Object> headers = this.headerMapper.toHeaders(jmsMessage);
requestMessage = // Here we get the integration message
(result instanceof Message<?>) ?
this.messageBuilderFactory.fromMessage((Message<?>) result).copyHeaders(headers).build() :
this.messageBuilderFactory.withPayload(result).copyHeaders(headers).build();
}
catch (RuntimeException e) {
...
return;
}
if (!this.expectReply) { // Send to downstream directly here
this.gatewayDelegate.send(requestMessage);
}
else { ... // Won't go here
As you can see, it's impossible to capture the requestMessage
from outside and put it into the ReteyContext
unless repeating the whole process to build an Integration Message totally for error handling purpose. (And messageConverter
, headerMapper
, etc. cannot be accessed from sub-classes)
Maybe I should write a new MessageListener
which means I must give up JmsMessageDrivenEndpoint
as well (obviously not an option for me). Now I "solved" it by producing a identical message in the subclass, like code attached below:
@Override
public void onMessage(javax.jms.Message jmsMessage, Session session) throws JMSException {
Message<?> requestMessage = getIntegrationMessage(jmsMessage);
if (retryOp != null)
retryOp.execute(context -> {
context.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY, requestMessage);
super.onMessage(jmsMessage, session);
return null;
}, recoveryCallback);
else
super.onMessage(jmsMessage, session);
}
protected Message<?> getIntegrationMessage(javax.jms.Message jmsMessage) throws JMSException {
try {
Object result = messageConverter.fromMessage(jmsMessage);
Map<String, Object> headers = headerMapper.toHeaders(jmsMessage);
return result instanceof Message<?> ?
messageBuilderFactory.fromMessage((Message<?>) result).copyHeaders(headers).build() :
messageBuilderFactory.withPayload(result).copyHeaders(headers).build();
} catch (RuntimeException e) {
return null; // It doesn't matter here since it will fail anyway later
}
}
Would it possible to extract the send process to a method like sendAndReceiveReply
or allow a custom MessageGateway
? Thanks.