Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CyclicBarrier;
Expand Down Expand Up @@ -61,6 +62,7 @@
/**
* @author Gary Russell
* @author Artem Bilan
* @author Glenn Renfro
*
* @since 4.0
*
Expand Down Expand Up @@ -126,8 +128,9 @@ public void pubSubLostConnectionTest() throws Exception {
this.channel.send(new GenericMessage<>("foo"));
latch.await(10, TimeUnit.SECONDS);
latch.reset();
BlockingQueueConsumer consumer = (BlockingQueueConsumer) TestUtils.getPropertyValue(this.channel,
"container.consumers", Set.class).iterator().next();
Iterator<BlockingQueueConsumer> blockingQueueConsumerIterator =
TestUtils.<Set<BlockingQueueConsumer>>getPropertyValue(this.channel, "container.consumers").iterator();
BlockingQueueConsumer consumer = blockingQueueConsumerIterator.next();
connectionFactory.destroy();
waitForNewConsumer(this.channel, consumer);
this.channel.send(new GenericMessage<>("bar"));
Expand All @@ -136,19 +139,18 @@ public void pubSubLostConnectionTest() throws Exception {
this.pubSubWithEP.destroy();
this.withEP.destroy();
this.pollableWithEP.destroy();
assertThat(TestUtils.getPropertyValue(connectionFactory, "connectionListener.delegates", Collection.class)
.size()).isEqualTo(0);
assertThat(TestUtils.<Collection<?>>getPropertyValue(connectionFactory, "connectionListener.delegates"))
.isEmpty();
}

@SuppressWarnings("unchecked")
private void waitForNewConsumer(PublishSubscribeAmqpChannel channel, BlockingQueueConsumer consumer)
throws Exception {

Lock consumersLock = TestUtils.getPropertyValue(channel, "container.consumersLock", Lock.class);
Lock consumersLock = TestUtils.getPropertyValue(channel, "container.consumersLock");
int n = 0;
while (n++ < 100) {
Set<BlockingQueueConsumer> consumers = TestUtils
.getPropertyValue(channel, "container.consumers", Set.class);
.getPropertyValue(channel, "container.consumers");
consumersLock.lock();
try {
if (!consumers.isEmpty()) {
Expand Down Expand Up @@ -246,8 +248,10 @@ public void extractPayloadTests() {
assertThat(received.getPayload()).isEqualTo(foo);
assertThat(received.getHeaders().get("baz")).isEqualTo("qux");

assertThat(TestUtils.getPropertyValue(this.pollableWithEP, "inboundHeaderMapper")).isSameAs(this.mapperIn);
assertThat(TestUtils.getPropertyValue(this.pollableWithEP, "outboundHeaderMapper")).isSameAs(this.mapperOut);
assertThat(TestUtils.<AmqpHeaderMapper>getPropertyValue(this.pollableWithEP, "inboundHeaderMapper"))
.isSameAs(this.mapperIn);
assertThat(TestUtils.<AmqpHeaderMapper>getPropertyValue(this.pollableWithEP, "outboundHeaderMapper"))
.isSameAs(this.mapperOut);
}

@Test
Expand All @@ -259,8 +263,7 @@ public void messageConversionTests() {
new SimpleMessageListenerContainer(this.connectionFactory), amqpTemplate);
channel.setBeanFactory(mock());
channel.afterPropertiesSet();
MessageListener listener = TestUtils.getPropertyValue(channel, "container.messageListener",
MessageListener.class);
MessageListener listener = TestUtils.getPropertyValue(channel, "container.messageListener");
willThrow(new MessageConversionException("foo", new IllegalStateException("bar")))
.given(messageConverter).fromMessage(any(org.springframework.amqp.core.Message.class));
assertThatExceptionOfType(MessageConversionException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
/**
* @author Mark Fisher
* @author Gary Russell
* @author Glenn Renfro
*
* @since 2.1
*/
@SpringJUnitConfig
Expand All @@ -62,54 +64,56 @@ public class AmqpChannelParserTests {
@Test
public void interceptor() {
MessageChannel channel = context.getBean("channelWithInterceptor", MessageChannel.class);
List<?> interceptorList = TestUtils.getPropertyValue(channel, "interceptors.interceptors", List.class);
assertThat(interceptorList.size()).isEqualTo(1);
List<?> interceptorList = TestUtils.getPropertyValue(channel, "interceptors.interceptors");
assertThat(interceptorList).hasSize(1);
assertThat(interceptorList.get(0).getClass()).isEqualTo(TestInterceptor.class);
assertThat(TestUtils.getPropertyValue(
TestUtils.getPropertyValue(channel, "dispatcher"), "maxSubscribers", Integer.class).intValue())
assertThat(TestUtils.<Integer>getPropertyValue(
TestUtils.getPropertyValue(channel, "dispatcher"), "maxSubscribers").intValue())
.isEqualTo(Integer.MAX_VALUE);
channel = context.getBean("pubSub", MessageChannel.class);
Object mbf = context.getBean(IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME);
assertThat(TestUtils.getPropertyValue(channel, "container.messageListener.messageBuilderFactory"))
assertThat(TestUtils.<Object>getPropertyValue(channel, "container.messageListener.messageBuilderFactory"))
.isSameAs(mbf);
assertThat(TestUtils.getPropertyValue(channel, "container.missingQueuesFatal", Boolean.class)).isTrue();
assertThat(TestUtils.getPropertyValue(channel, "container.transactional", Boolean.class)).isFalse();
assertThat(TestUtils.getPropertyValue(channel, "amqpTemplate.transactional", Boolean.class)).isFalse();
assertThat(TestUtils.getPropertyValue(channel, "container")).isInstanceOf(SimpleMessageListenerContainer.class);
assertThat(TestUtils.<Boolean>getPropertyValue(channel, "container.missingQueuesFatal")).isTrue();
assertThat(TestUtils.<Boolean>getPropertyValue(channel, "container.transactional")).isFalse();
assertThat(TestUtils.<Boolean>getPropertyValue(channel, "amqpTemplate.transactional")).isFalse();
assertThat(TestUtils.<SimpleMessageListenerContainer>getPropertyValue(channel, "container"))
.isInstanceOf(SimpleMessageListenerContainer.class);
}

@Test
public void subscriberLimit() {
MessageChannel channel = context.getBean("channelWithSubscriberLimit", MessageChannel.class);
assertThat(TestUtils.getPropertyValue(
TestUtils.getPropertyValue(channel, "dispatcher"), "maxSubscribers", Integer.class).intValue())
assertThat(TestUtils.<Integer>getPropertyValue(
TestUtils.getPropertyValue(channel, "dispatcher"), "maxSubscribers").intValue())
.isEqualTo(1);
assertThat(TestUtils.getPropertyValue(channel, "container.missingQueuesFatal", Boolean.class)).isFalse();
assertThat(TestUtils.getPropertyValue(channel, "container.transactional", Boolean.class)).isFalse();
assertThat(TestUtils.getPropertyValue(channel, "amqpTemplate.transactional", Boolean.class)).isTrue();
assertThat(TestUtils.getPropertyValue(channel, "extractPayload", Boolean.class)).isFalse();
assertThat(TestUtils.getPropertyValue(channel, "container")).isInstanceOf(DirectMessageListenerContainer.class);
assertThat(TestUtils.getPropertyValue(channel, "container.consumersPerQueue")).isEqualTo(2);
assertThat(TestUtils.<Boolean>getPropertyValue(channel, "container.missingQueuesFatal")).isFalse();
assertThat(TestUtils.<Boolean>getPropertyValue(channel, "container.transactional")).isFalse();
assertThat(TestUtils.<Boolean>getPropertyValue(channel, "amqpTemplate.transactional")).isTrue();
assertThat(TestUtils.<Boolean>getPropertyValue(channel, "extractPayload")).isFalse();
assertThat(TestUtils.<DirectMessageListenerContainer>getPropertyValue(channel, "container"))
.isInstanceOf(DirectMessageListenerContainer.class);
assertThat(TestUtils.<Integer>getPropertyValue(channel, "container.consumersPerQueue")).isEqualTo(2);
}

@Test
public void testMapping() {
checkExtract(this.pollableWithEP);
checkExtract(this.withEP);
checkExtract(this.pubSubWithEP);
assertThat(TestUtils.getPropertyValue(this.withEP, "defaultDeliveryMode"))
assertThat(TestUtils.<MessageDeliveryMode>getPropertyValue(this.withEP, "defaultDeliveryMode"))
.isEqualTo(MessageDeliveryMode.NON_PERSISTENT);
assertThat(TestUtils.getPropertyValue(this.withEP, "headersMappedLast", Boolean.class)).isFalse();
assertThat(TestUtils.getPropertyValue(this.pollableWithEP, "defaultDeliveryMode")).isNull();
assertThat(TestUtils.getPropertyValue(this.pollableWithEP, "headersMappedLast", Boolean.class)).isTrue();
assertThat(TestUtils.<Boolean>getPropertyValue(this.withEP, "headersMappedLast")).isFalse();
assertThat(TestUtils.<Object>getPropertyValue(this.pollableWithEP, "defaultDeliveryMode")).isNull();
assertThat(TestUtils.<Boolean>getPropertyValue(this.pollableWithEP, "headersMappedLast")).isTrue();
}

private void checkExtract(AbstractAmqpChannel channel) {
assertThat(TestUtils.getPropertyValue(channel, "outboundHeaderMapper").toString())
.contains("Mock for AmqpHeaderMapper");
assertThat(TestUtils.getPropertyValue(channel, "inboundHeaderMapper").toString())
.contains("Mock for AmqpHeaderMapper");
assertThat(TestUtils.getPropertyValue(channel, "extractPayload", Boolean.class)).isTrue();
assertThat(TestUtils.<Boolean>getPropertyValue(channel, "extractPayload")).isTrue();
}

private static class TestInterceptor implements ChannelInterceptor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
* @author Mark Fisher
* @author Artem Bilan
* @author Gary Russell
* @author Glenn Renfro
*
* @since 2.1
*/
Expand All @@ -62,41 +63,41 @@ public void verifyIdAsChannel() {
Object adapter = context.getBean("rabbitInbound.adapter");
assertThat(channel.getClass()).isEqualTo(DirectChannel.class);
assertThat(adapter.getClass()).isEqualTo(AmqpInboundChannelAdapter.class);
assertThat(TestUtils.getPropertyValue(adapter, "autoStartup")).isEqualTo(Boolean.TRUE);
assertThat(TestUtils.getPropertyValue(adapter, "phase")).isEqualTo(Integer.MAX_VALUE / 2);
assertThat(TestUtils.getPropertyValue(adapter, "messageListenerContainer.missingQueuesFatal", Boolean.class))
assertThat(TestUtils.<Boolean>getPropertyValue(adapter, "autoStartup")).isEqualTo(Boolean.TRUE);
assertThat(TestUtils.<Integer>getPropertyValue(adapter, "phase")).isEqualTo(Integer.MAX_VALUE / 2);
assertThat(TestUtils.<Boolean>getPropertyValue(adapter, "messageListenerContainer.missingQueuesFatal"))
.isTrue();
assertThat(TestUtils.getPropertyValue(adapter, "messageListenerContainer"))
assertThat(TestUtils.<SimpleMessageListenerContainer>getPropertyValue(adapter, "messageListenerContainer"))
.isInstanceOf(SimpleMessageListenerContainer.class);
assertThat(TestUtils.getPropertyValue(adapter, "batchMode", BatchMode.class))
assertThat(TestUtils.<BatchMode>getPropertyValue(adapter, "batchMode"))
.isEqualTo(BatchMode.EXTRACT_PAYLOADS);
assertThat(TestUtils.getPropertyValue(adapter, "messageListenerContainer.batchSize", Integer.class))
assertThat(TestUtils.<Integer>getPropertyValue(adapter, "messageListenerContainer.batchSize"))
.isEqualTo(2);
}

@Test
public void verifyDMCC() {
Object adapter = context.getBean("dmlc.adapter");
assertThat(adapter.getClass()).isEqualTo(AmqpInboundChannelAdapter.class);
assertThat(TestUtils.getPropertyValue(adapter, "messageListenerContainer.missingQueuesFatal", Boolean.class))
assertThat(TestUtils.<Boolean>getPropertyValue(adapter, "messageListenerContainer.missingQueuesFatal"))
.isFalse();
assertThat(TestUtils.getPropertyValue(adapter, "messageListenerContainer"))
assertThat(TestUtils.<DirectMessageListenerContainer>getPropertyValue(adapter, "messageListenerContainer"))
.isInstanceOf(DirectMessageListenerContainer.class);
assertThat(TestUtils.getPropertyValue(adapter, "messageListenerContainer.consumersPerQueue")).isEqualTo(2);
assertThat(TestUtils.getPropertyValue(adapter, "batchMode", BatchMode.class))
assertThat(TestUtils.<Integer>getPropertyValue(adapter, "messageListenerContainer.consumersPerQueue")).isEqualTo(2);
assertThat(TestUtils.<BatchMode>getPropertyValue(adapter, "batchMode"))
.isEqualTo(BatchMode.MESSAGES);
}

@Test
public void verifyLifeCycle() {
Object adapter = context.getBean("autoStartFalse.adapter");
assertThat(TestUtils.getPropertyValue(adapter, "autoStartup")).isEqualTo(Boolean.FALSE);
assertThat(TestUtils.getPropertyValue(adapter, "phase")).isEqualTo(123);
assertThat(TestUtils.getPropertyValue(adapter, "messageListenerContainer.acknowledgeMode"))
assertThat(TestUtils.<Boolean>getPropertyValue(adapter, "autoStartup")).isEqualTo(Boolean.FALSE);
assertThat(TestUtils.<Integer>getPropertyValue(adapter, "phase")).isEqualTo(123);
assertThat(TestUtils.<AcknowledgeMode>getPropertyValue(adapter, "messageListenerContainer.acknowledgeMode"))
.isEqualTo(AcknowledgeMode.NONE);
assertThat(TestUtils.getPropertyValue(adapter, "messageListenerContainer.missingQueuesFatal", Boolean.class))
assertThat(TestUtils.<Boolean>getPropertyValue(adapter, "messageListenerContainer.missingQueuesFatal"))
.isFalse();
assertThat(TestUtils.getPropertyValue(adapter, "messageListenerContainer.batchSize", Integer.class))
assertThat(TestUtils.<Integer>getPropertyValue(adapter, "messageListenerContainer.batchSize"))
.isEqualTo(3);
}

Expand All @@ -106,9 +107,8 @@ public void withHeaderMapperStandardAndCustomHeaders() throws Exception {
AmqpInboundChannelAdapter.class);

AbstractMessageListenerContainer mlc =
TestUtils.getPropertyValue(adapter, "messageListenerContainer", AbstractMessageListenerContainer.class);
ChannelAwareMessageListener listener = TestUtils.getPropertyValue(mlc, "messageListener",
ChannelAwareMessageListener.class);
TestUtils.<AbstractMessageListenerContainer>getPropertyValue(adapter, "messageListenerContainer");
ChannelAwareMessageListener listener = TestUtils.getPropertyValue(mlc, "messageListener");
MessageProperties amqpProperties = new MessageProperties();
amqpProperties.setAppId("test.appId");
amqpProperties.setClusterId("test.clusterId");
Expand All @@ -135,9 +135,8 @@ public void withHeaderMapperOnlyCustomHeaders() throws Exception {
AmqpInboundChannelAdapter.class);

AbstractMessageListenerContainer mlc =
TestUtils.getPropertyValue(adapter, "messageListenerContainer", AbstractMessageListenerContainer.class);
ChannelAwareMessageListener listener = TestUtils.getPropertyValue(mlc, "messageListener",
ChannelAwareMessageListener.class);
TestUtils.<AbstractMessageListenerContainer>getPropertyValue(adapter, "messageListenerContainer");
ChannelAwareMessageListener listener = TestUtils.getPropertyValue(mlc, "messageListener");
MessageProperties amqpProperties = new MessageProperties();
amqpProperties.setAppId("test.appId");
amqpProperties.setClusterId("test.clusterId");
Expand All @@ -164,9 +163,8 @@ public void withHeaderMapperNothingToMap() throws Exception {
AmqpInboundChannelAdapter.class);

AbstractMessageListenerContainer mlc =
TestUtils.getPropertyValue(adapter, "messageListenerContainer", AbstractMessageListenerContainer.class);
ChannelAwareMessageListener listener = TestUtils.getPropertyValue(mlc, "messageListener",
ChannelAwareMessageListener.class);
TestUtils.<AbstractMessageListenerContainer>getPropertyValue(adapter, "messageListenerContainer");
ChannelAwareMessageListener listener = TestUtils.getPropertyValue(mlc, "messageListener");
MessageProperties amqpProperties = new MessageProperties();
amqpProperties.setAppId("test.appId");
amqpProperties.setClusterId("test.clusterId");
Expand Down Expand Up @@ -194,9 +192,8 @@ public void withHeaderMapperDefaultMapping() throws Exception {
AmqpInboundChannelAdapter.class);

AbstractMessageListenerContainer mlc =
TestUtils.getPropertyValue(adapter, "messageListenerContainer", AbstractMessageListenerContainer.class);
ChannelAwareMessageListener listener = TestUtils.getPropertyValue(mlc, "messageListener",
ChannelAwareMessageListener.class);
TestUtils.<AbstractMessageListenerContainer>getPropertyValue(adapter, "messageListenerContainer");
ChannelAwareMessageListener listener = TestUtils.getPropertyValue(mlc, "messageListener");
MessageProperties amqpProperties = new MessageProperties();
amqpProperties.setAppId("test.appId");
amqpProperties.setClusterId("test.clusterId");
Expand Down
Loading