Skip to content

More Extendibility for ChannelPublishingJmsMessageListener? #8631

Closed
@artembilan

Description

@artembilan

Discussed in #8629

Originally posted by Antxl May 21, 2023
I'm making a custom cloud-stream-binder for a jms middleware, and I'm using JmsMessageDrivenEndpoint as MessageProducer.
It requires a ChannelPublishingJmsMessageListener where I had difficulties in applying the RetryTemplate built by AbstractBinder to that I can make maxAttempts property take effect.

Let's have a look at method public void onMessage(javax.jms.Message jmsMessage, Session session) in it.

		Message<?> requestMessage;
		try {
			final Object result;
			if (this.extractRequestPayload) {
				result = this.messageConverter.fromMessage(jmsMessage);
				this.logger.debug(...);
			}
			else {
				result = jmsMessage;
			}

			Map<String, Object> headers = this.headerMapper.toHeaders(jmsMessage);
			requestMessage = // Here we get the integration message
					(result instanceof Message<?>) ?
							this.messageBuilderFactory.fromMessage((Message<?>) result).copyHeaders(headers).build() :
							this.messageBuilderFactory.withPayload(result).copyHeaders(headers).build();
		}
		catch (RuntimeException e) {
			...
			return;
		}

		if (!this.expectReply) { // Send to downstream directly here
			this.gatewayDelegate.send(requestMessage);
		}
		else { ... // Won't go here

As you can see, it's impossible to capture the requestMessage from outside and put it into the ReteyContext unless repeating the whole process to build an Integration Message totally for error handling purpose. (And messageConverter, headerMapper, etc. cannot be accessed from sub-classes)

Maybe I should write a new MessageListener which means I must give up JmsMessageDrivenEndpoint as well (obviously not an option for me). Now I "solved" it by producing a identical message in the subclass, like code attached below:

    @Override
    public void onMessage(javax.jms.Message jmsMessage, Session session) throws JMSException {
        Message<?> requestMessage = getIntegrationMessage(jmsMessage);
        if (retryOp != null)
            retryOp.execute(context -> {
                context.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY, requestMessage);
                super.onMessage(jmsMessage, session);
                return null;
            }, recoveryCallback);
        else
            super.onMessage(jmsMessage, session);
    }

    protected Message<?> getIntegrationMessage(javax.jms.Message jmsMessage) throws JMSException {
        try {
            Object result = messageConverter.fromMessage(jmsMessage);

            Map<String, Object> headers = headerMapper.toHeaders(jmsMessage);
            return result instanceof Message<?> ?
                    messageBuilderFactory.fromMessage((Message<?>) result).copyHeaders(headers).build() :
                    messageBuilderFactory.withPayload(result).copyHeaders(headers).build();
        } catch (RuntimeException e) {
            return null; // It doesn't matter here since it will fail anyway later
        }
    }

Would it possible to extract the send process to a method like sendAndReceiveReply or allow a custom MessageGateway? Thanks.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions