-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Description
Discussed in #8629
Originally posted by Antxl May 21, 2023
I'm making a custom cloud-stream-binder for a jms middleware, and I'm using JmsMessageDrivenEndpoint as MessageProducer.
It requires a ChannelPublishingJmsMessageListener where I had difficulties in applying the RetryTemplate built by AbstractBinder to that I can make maxAttempts property take effect.
Let's have a look at method public void onMessage(javax.jms.Message jmsMessage, Session session) in it.
Message<?> requestMessage;
try {
final Object result;
if (this.extractRequestPayload) {
result = this.messageConverter.fromMessage(jmsMessage);
this.logger.debug(...);
}
else {
result = jmsMessage;
}
Map<String, Object> headers = this.headerMapper.toHeaders(jmsMessage);
requestMessage = // Here we get the integration message
(result instanceof Message<?>) ?
this.messageBuilderFactory.fromMessage((Message<?>) result).copyHeaders(headers).build() :
this.messageBuilderFactory.withPayload(result).copyHeaders(headers).build();
}
catch (RuntimeException e) {
...
return;
}
if (!this.expectReply) { // Send to downstream directly here
this.gatewayDelegate.send(requestMessage);
}
else { ... // Won't go hereAs you can see, it's impossible to capture the requestMessage from outside and put it into the ReteyContext unless repeating the whole process to build an Integration Message totally for error handling purpose. (And messageConverter, headerMapper, etc. cannot be accessed from sub-classes)
Maybe I should write a new MessageListener which means I must give up JmsMessageDrivenEndpoint as well (obviously not an option for me). Now I "solved" it by producing a identical message in the subclass, like code attached below:
@Override
public void onMessage(javax.jms.Message jmsMessage, Session session) throws JMSException {
Message<?> requestMessage = getIntegrationMessage(jmsMessage);
if (retryOp != null)
retryOp.execute(context -> {
context.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY, requestMessage);
super.onMessage(jmsMessage, session);
return null;
}, recoveryCallback);
else
super.onMessage(jmsMessage, session);
}
protected Message<?> getIntegrationMessage(javax.jms.Message jmsMessage) throws JMSException {
try {
Object result = messageConverter.fromMessage(jmsMessage);
Map<String, Object> headers = headerMapper.toHeaders(jmsMessage);
return result instanceof Message<?> ?
messageBuilderFactory.fromMessage((Message<?>) result).copyHeaders(headers).build() :
messageBuilderFactory.withPayload(result).copyHeaders(headers).build();
} catch (RuntimeException e) {
return null; // It doesn't matter here since it will fail anyway later
}
}Would it possible to extract the send process to a method like sendAndReceiveReply or allow a custom MessageGateway? Thanks.