From 946cb19a7b49381023a2ae8a5612a9175f14d476 Mon Sep 17 00:00:00 2001 From: Thomas Lavocat Date: Fri, 6 Sep 2024 11:31:23 +0200 Subject: [PATCH] ARTEMIS-5037: option to limit mirror propagation Add a new option in the Mirror settings to prevent a broker from propagating messages. When working with a topology where 4 nodes are forming a square and where each node in that square mirrors its two neighbors: a message leaving a corner can reach the opposite corner of the square by two different routes. This is causing the message ordering to get broken. example: 1 <-> 2 ^ ^ | | v v 4 <-> 3 A message from 1 will reach 3 by 2 and 4. Message duplication checks will prevent the message from being duplicated but won't help regarding the order of the messages. This is because a either the route by 2 or 4 can be faster than the other, so whomever wins the race sets the message first. Fixing the example: Using the new option to not forward messages coming from a link, we break the possibilities to have two routes to reach the opposite corner. The above example is updated as followed: * 2 never forwards messages coming from 1 * 1 never forwards messages coming from 2 * 3 never forwards messages coming from 4 * 4 never forwards messages coming from 3 Now, when a messages leaves 1: * it reaches 2 and stops there * it reaches 4 * it reaches 3 through 4 and stops there Now, when a messages leaves 2: * it reaches 1 and stops there * it reaches 3 * it reaches 4 through 3 and stops there Now, when a messages leaves 3: * it reaches 4 and stops there * it reaches 2 * it reaches 1 through 2 and stops there Now, when a messages leaves 4: * it reaches 3 and stops there * it reaches 1 * it reaches 2 through 1 and stops there The new test AMQPSquareMirroringTest.java is testing this exact setup. --- .../amqp/connect/AMQPBrokerConnection.java | 16 +- .../mirror/AMQPMirrorControllerSource.java | 11 + .../mirror/AMQPMirrorControllerTarget.java | 17 +- .../amqp/proton/AMQPConnectionContext.java | 14 +- .../AMQPMirrorBrokerConnectionElement.java | 11 + .../impl/FileConfigurationParser.java | 3 +- .../schema/artemis-configuration.xsd | 8 + .../amqp/connect/AMQPSquareMirroringTest.java | 282 ++++++++++++++++++ 8 files changed, 350 insertions(+), 12 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSquareMirroringTest.java diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java index f47cf26dd367..b67d8e9c8ad7 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.protocol.amqp.connect; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -430,16 +431,19 @@ private void doConnect() { final Queue queue = server.locateQueue(getMirrorSNF(replica)); final boolean coreTunnelingEnabled = isCoreMessageTunnelingEnabled(replica); - final Symbol[] desiredCapabilities; + ArrayList desiredCapabilitiesList = new ArrayList<>(); + desiredCapabilitiesList.add(AMQPMirrorControllerSource.MIRROR_CAPABILITY); if (coreTunnelingEnabled) { - desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY, - AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT}; - } else { - desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY}; + desiredCapabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT); + } + if (!replica.getCanForwardMessages()) { + desiredCapabilitiesList.add(AMQPMirrorControllerSource.NO_FORWARD); } - final Symbol[] requiredOfferedCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY}; + final Symbol[] desiredCapabilities = (Symbol[]) desiredCapabilitiesList.toArray(new Symbol[]{}); + + final Symbol[] requiredOfferedCapabilities = replica.getCanForwardMessages() ? new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY} : new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY, AMQPMirrorControllerSource.NO_FORWARD}; connectSender(queue, queue.getName().toString(), diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java index 5081f08c0d59..cfa61c87bc04 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java @@ -89,9 +89,11 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im // Capabilities public static final Symbol MIRROR_CAPABILITY = Symbol.getSymbol("amq.mirror"); public static final Symbol QPID_DISPATCH_WAYPOINT_CAPABILITY = Symbol.valueOf("qd.waypoint"); + public static final Symbol NO_FORWARD = Symbol.getSymbol("amq.no.forward"); public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = SimpleString.of(INTERNAL_ID.toString()); public static final SimpleString INTERNAL_BROKER_ID_EXTRA_PROPERTY = SimpleString.of(BROKER_ID.toString()); + public static final SimpleString INTERNAL_NO_FORWARD = SimpleString.of(NO_FORWARD.toString()); private static final ThreadLocal mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null)); @@ -301,6 +303,10 @@ private boolean invalidTarget(MirrorController controller) { return controller != null && sameNode(getRemoteMirrorId(), controller.getRemoteMirrorId()); } + private boolean isBlockedByNoForward(Message message) { + return Boolean.TRUE.equals(message.getBrokerProperty(INTERNAL_NO_FORWARD)); + } + private boolean ignoreAddress(SimpleString address) { if (address.startsWith(server.getConfiguration().getManagementAddress())) { return true; @@ -344,6 +350,11 @@ public void sendMessage(Transaction tx, Message message, RoutingContext context) return; } + if (isBlockedByNoForward(message)) { + logger.trace("sendMessage::server {} is discarding the message because its source is setting a noForward policy", server); + return; + } + logger.trace("sendMessage::{} send message {}", server, message); try { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java index 3517e0cdbd84..3bdc7f1037da 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java @@ -57,6 +57,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver; import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.pools.MpscPool; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; @@ -77,8 +78,10 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_QUEUE; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY; +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.NO_FORWARD; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID; +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_NO_FORWARD; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY; @@ -92,6 +95,8 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement private static final ThreadLocal CONTROLLER_THREAD_LOCAL = new ThreadLocal<>(); + private boolean noMessageForwarding = false; + public static void setControllerInUse(MirrorController controller) { CONTROLLER_THREAD_LOCAL.set(controller); } @@ -248,6 +253,13 @@ public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI, this.configuration = server.getConfiguration(); this.referenceNodeStore = sessionSPI.getProtocolManager().getReferenceIDSupplier(); mirrorContext = protonSession.getSessionSPI().getSessionContext(); + if (receiver.getRemoteDesiredCapabilities() != null) { + for (Symbol capability : receiver.getRemoteDesiredCapabilities()) { + if (capability == NO_FORWARD) { + this.noMessageForwarding = true; + } + } + } } @Override @@ -534,6 +546,9 @@ private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotat message.setBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY, internalID); message.setBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY, internalMirrorID); + if (noMessageForwarding) { + message.setBrokerProperty(INTERNAL_NO_FORWARD, true); + } if (internalAddress != null) { message.setAddress(internalAddress); @@ -590,4 +605,4 @@ public void sendMessage(Transaction tx, Message message, RoutingContext context) // Do nothing } -} \ No newline at end of file +} diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java index 17b54cf15b27..608279c172fc 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton; import java.net.URI; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -458,14 +459,19 @@ private void handleReplicaTargetLinkOpened(AMQPSessionContext protonSession, Rec return; } + ArrayList offeredCapabilitiesList = new ArrayList<>(); + offeredCapabilitiesList.add(AMQPMirrorControllerSource.MIRROR_CAPABILITY); // We need to check if the remote desires to send us tunneled core messages or not, and if // we support that we need to offer that back so it knows it can actually do core tunneling. if (verifyDesiredCapability(receiver, AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT)) { - receiver.setOfferedCapabilities(new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY, - AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT}); - } else { - receiver.setOfferedCapabilities(new Symbol[]{AMQPMirrorControllerSource.MIRROR_CAPABILITY}); + offeredCapabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT); + } + + // If the remote wants us to not forward any messages to other mirrors we need to offer that capability + if (verifyDesiredCapability(receiver, AMQPMirrorControllerSource.NO_FORWARD)){ + offeredCapabilitiesList.add(AMQPMirrorControllerSource.NO_FORWARD); } + receiver.setOfferedCapabilities((Symbol[]) offeredCapabilitiesList.toArray(new Symbol[]{})); protonSession.addReplicaTarget(receiver); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java index 944099c2a003..67377d36e87a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java @@ -30,6 +30,8 @@ public class AMQPMirrorBrokerConnectionElement extends AMQPBrokerConnectionEleme boolean queueCreation = true; + boolean canForwardMessages = true; + boolean queueRemoval = true; boolean messageAcknowledgements = true; @@ -75,6 +77,15 @@ public AMQPMirrorBrokerConnectionElement setQueueCreation(boolean queueCreation) return this; } + public boolean getCanForwardMessages() { + return canForwardMessages; + } + + public AMQPMirrorBrokerConnectionElement setCanForwardMessages(boolean canForwardMessages) { + this.canForwardMessages = canForwardMessages; + return this; + } + public boolean isQueueRemoval() { return queueRemoval; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 588e0651fae9..1e40ed951469 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -2191,10 +2191,11 @@ private void parseAMQPBrokerConnections(final Element e, boolean durable = getBooleanAttribute(e2, "durable", true); boolean queueRemoval = getBooleanAttribute(e2, "queue-removal", true); boolean sync = getBooleanAttribute(e2, "sync", false); + boolean canForwardMessages = !getBooleanAttribute(e2, "no-messages-forwarding", false); String addressFilter = getAttributeValue(e2, "address-filter"); AMQPMirrorBrokerConnectionElement amqpMirrorConnectionElement = new AMQPMirrorBrokerConnectionElement(); - amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable).setAddressFilter(addressFilter).setSync(sync); + amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable).setAddressFilter(addressFilter).setSync(sync).setCanForwardMessages(canForwardMessages); connectionElement = amqpMirrorConnectionElement; connectionElement.setType(AMQPBrokerConnectionAddressType.MIRROR); diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index e16a7dc9b236..db64d3df8898 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -2298,6 +2298,14 @@ + + + + If this is true, the mirror at the opposite end of the link will not forward messages coming from that link to any other mirrors down the line. + This is false by default. + + + diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSquareMirroringTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSquareMirroringTest.java new file mode 100644 index 000000000000..77a0b4b26e77 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSquareMirroringTest.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp.connect; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class AMQPSquareMirroringTest extends AmqpClientTestSupport { + + protected static final int AMQP_PORT_2 = 5673; + protected static final int AMQP_PORT_3 = 5674; + protected static final int AMQP_PORT_4 = 5675; + + ActiveMQServer server_2; + ActiveMQServer server_3; + ActiveMQServer server_4; + + @Override + protected ActiveMQServer createServer() throws Exception { + return createServer(AMQP_PORT, false); + } + + protected String getConfiguredProtocols() { + return "AMQP,CORE,OPENWIRE"; + } + + @Test + public void testSquare() throws Exception { + server_2 = createServer(AMQP_PORT_2, false); + server_3 = createServer(AMQP_PORT_3, false); + server_4 = createServer(AMQP_PORT_4, false); + + // name the servers, for convenience during debugging + server.getConfiguration().setName("1"); + server_2.getConfiguration().setName("2"); + server_3.getConfiguration().setName("3"); + server_4.getConfiguration().setName("4"); + + /** + * + * Setup the mirroring topology to be a square: + * + * 1 <- - -> 2 + * ^ ^ The link between 1 and 2 and the + * | | link between 3 and 4 are noForward + * v v links in both directions. + * 4 <- - -> 3 + */ + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "1to2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setCanForwardMessages(false)); + server.getConfiguration().addAMQPConnection(amqpConnection); + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "1to4", "tcp://localhost:" + AMQP_PORT_4).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); + server.getConfiguration().addAMQPConnection(amqpConnection); + } + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "2to1", "tcp://localhost:" + AMQP_PORT).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setCanForwardMessages(false)); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "2to3", "tcp://localhost:" + AMQP_PORT_3).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + } + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "3to2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); + server_3.getConfiguration().addAMQPConnection(amqpConnection); + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "3to4", "tcp://localhost:" + AMQP_PORT_4).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setCanForwardMessages(false)); + server_3.getConfiguration().addAMQPConnection(amqpConnection); + } + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "4to1", "tcp://localhost:" + AMQP_PORT).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); + server_4.getConfiguration().addAMQPConnection(amqpConnection); + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "4to3", "tcp://localhost:" + AMQP_PORT_3).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setCanForwardMessages(false)); + server_4.getConfiguration().addAMQPConnection(amqpConnection); + } + + server.start(); + server_2.start(); + server_3.start(); + server_4.start(); + + createAddressAndQueues(server); + Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server_3.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server_4.locateQueue(getQueueName()) != null); + + Queue q1 = server.locateQueue(getQueueName()); + assertNotNull(q1); + + Queue q2 = server.locateQueue(getQueueName()); + assertNotNull(q2); + + Queue q3 = server.locateQueue(getQueueName()); + assertNotNull(q3); + + Queue q4 = server.locateQueue(getQueueName()); + assertNotNull(q4); + + ConnectionFactory factory = CFUtil.createConnectionFactory(randomProtocol(), "tcp://localhost:" + AMQP_PORT); + ConnectionFactory factory2 = CFUtil.createConnectionFactory(randomProtocol(), "tcp://localhost:" + AMQP_PORT_2); + ConnectionFactory factory3 = CFUtil.createConnectionFactory(randomProtocol(), "tcp://localhost:" + AMQP_PORT_3); + ConnectionFactory factory4 = CFUtil.createConnectionFactory(randomProtocol(), "tcp://localhost:" + AMQP_PORT_4); + + try (Connection conn = factory4.createConnection()) { + Session session = conn.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage("message " + i)); + } + } + try (Connection conn = factory3.createConnection()) { + Session session = conn.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + for (int i = 10; i < 20; i++) { + producer.send(session.createTextMessage("message " + i)); + } + } + try (Connection conn = factory2.createConnection()) { + Session session = conn.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + for (int i = 20; i < 30; i++) { + producer.send(session.createTextMessage("message " + i)); + } + } + try (Connection conn = factory.createConnection()) { + Session session = conn.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + for (int i = 30; i < 40; i++) { + producer.send(session.createTextMessage("message " + i)); + } + } + + Thread.sleep(100); // some time to allow eventual loops + + Wait.assertEquals(40L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(40L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(40L, q3::getMessageCount, 1000, 100); + Wait.assertEquals(40L, q4::getMessageCount, 1000, 100); + + try (Connection conn = factory.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + consumer.close(); + } + + Wait.assertEquals(30L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(30L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(30L, q3::getMessageCount, 1000, 100); + Wait.assertEquals(30L, q4::getMessageCount, 1000, 100); + + try (Connection conn = factory2.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 10; i < 20; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + consumer.close(); + } + + Wait.assertEquals(20L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(20L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(20L, q3::getMessageCount, 1000, 100); + Wait.assertEquals(20L, q4::getMessageCount, 1000, 100); + + try (Connection conn = factory3.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 20; i < 30; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + consumer.close(); + } + + Wait.assertEquals(10L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q3::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q4::getMessageCount, 1000, 100); + + try (Connection conn = factory4.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 30; i < 40; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + consumer.close(); + } + + Wait.assertEquals(0L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q3::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q4::getMessageCount, 1000, 100); + + try (Connection conn = factory.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + assertNull(consumer.receiveNoWait()); + consumer.close(); + } + + try (Connection conn = factory2.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + assertNull(consumer.receiveNoWait()); + consumer.close(); + } + + try (Connection conn = factory3.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + assertNull(consumer.receiveNoWait()); + consumer.close(); + } + + try (Connection conn = factory4.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + assertNull(consumer.receiveNoWait()); + consumer.close(); + } + + } +} \ No newline at end of file