Skip to content

Commit

Permalink
AMQP-827: Fix @rl reply Message<?> conversion
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/AMQP-827

Use the correct `payloadConverter` in the `MessagingMessageConverter`
to support `@RabbitListener` `Message<?>` return types.

**cherry-pick to 2.0.x, 1.7.x**
  • Loading branch information
garyrussell authored and artembilan committed Aug 8, 2018
1 parent 9ddf6d6 commit bdc75e4
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ protected final MessagingMessageConverter getMessagingMessageConverter() {
return this.messagingMessageConverter;
}

@Override
public void setMessageConverter(MessageConverter messageConverter) {
super.setMessageConverter(messageConverter);
this.messagingMessageConverter.setPayloadConverter(messageConverter);
}

@Override
public void onMessage(org.springframework.amqp.core.Message amqpMessage, Channel channel) throws Exception {
Message<?> message = toMessagingMessage(amqpMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -68,6 +69,7 @@
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.MessagePropertiesBuilder;
import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerEndpoint;
Expand Down Expand Up @@ -123,6 +125,7 @@
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.test.annotation.DirtiesContext;
Expand Down Expand Up @@ -174,7 +177,8 @@ public class EnableRabbitIntegrationTests {
"test.notconverted.channel", "test.notconverted.messagechannel", "test.notconverted.messagingmessage",
"test.converted.foomessage", "test.notconverted.messagingmessagenotgeneric", "test.simple.direct",
"test.simple.direct2", "test.generic.list", "test.generic.map",
"amqp656dlq", "test.simple.declare", "test.return.exceptions", "test.pojo.errors", "test.pojo.errors2");
"amqp656dlq", "test.simple.declare", "test.return.exceptions", "test.pojo.errors", "test.pojo.errors2",
"test.messaging.message", "test.amqp.message");

@Autowired
private RabbitTemplate rabbitTemplate;
Expand Down Expand Up @@ -812,6 +816,26 @@ public void connectionName() {
assertThat(conn.getDelegate().getClientProvidedName(), equalTo("testConnectionName"));
}

@Test
public void messagingMessageReturned() {
Message message = org.springframework.amqp.core.MessageBuilder.withBody("\"messaging\"".getBytes())
.andProperties(MessagePropertiesBuilder.newInstance().setContentType("application/json").build()).build();
message = this.rabbitTemplate.sendAndReceive("test.messaging.message", message);
assertThat(message, is(notNullValue()));
assertThat(new String(message.getBody()), equalTo("{\"field\":\"MESSAGING\"}"));
assertThat(message.getMessageProperties().getHeaders().get("foo"), equalTo("bar"));
}

@Test
public void amqpMessageReturned() {
Message message = org.springframework.amqp.core.MessageBuilder.withBody("amqp".getBytes())
.andProperties(MessagePropertiesBuilder.newInstance().setContentType("text/plain").build()).build();
message = this.rabbitTemplate.sendAndReceive("test.amqp.message", message);
assertThat(message, is(notNullValue()));
assertThat(new String(message.getBody()), equalTo("AMQP"));
assertThat(message.getMessageProperties().getHeaders().get("foo"), equalTo("bar"));
}

interface TxService {

@Transactional
Expand Down Expand Up @@ -1108,6 +1132,22 @@ public Map<String, JsonObject> genericMap(JsonObject in) {
return Collections.singletonMap("key", in);
}

@RabbitListener(queues = "test.messaging.message", containerFactory = "simpleJsonListenerContainerFactory")
public org.springframework.messaging.Message<Bar> messagingMessage(String in) {
Bar bar = new Bar();
bar.field = in.toUpperCase();
return new GenericMessage<>(bar, Collections.singletonMap("foo", "bar"));
}

@RabbitListener(queues = "test.amqp.message")
public Message amqpMessage(String in) {
return org.springframework.amqp.core.MessageBuilder.withBody(in.toUpperCase().getBytes())
.andProperties(MessagePropertiesBuilder.newInstance().setContentType("text/plain")
.setHeader("foo", "bar")
.build())
.build();
}

}

public static class JsonObject {
Expand Down

0 comments on commit bdc75e4

Please sign in to comment.