From ef80545432c8afdaa018ae1666d018622ac24328 Mon Sep 17 00:00:00 2001 From: Thomas Lavocat Date: Thu, 26 Sep 2024 18:03:32 +0200 Subject: [PATCH] ARTEMIS-5037: option to limit mirror propagation Add a new option in the Mirror settings to prevent a broker from propagating messages. When working with a topology where 4 nodes are forming a square and where each node in that square mirrors its two neighbors: a message leaving a corner can reach the opposite corner of the square by two different routes. This is causing the message ordering to get broken. example: 1 <-> 2 ^ ^ | | v v 4 <-> 3 A message from 1 will reach 3 by 2 and 4. Message duplication checks will prevent the message from being duplicated but won't help regarding the order of the messages. This is because a either the route by 2 or 4 can be faster than the other, so whomever wins the race sets the message first. Fixing the example: Using the new option to not forward messages coming from a link, we break the possibilities to have two routes to reach the opposite corner. The above example is updated as followed: * 2 never forwards messages coming from 1 * 1 never forwards messages coming from 2 * 3 never forwards messages coming from 4 * 4 never forwards messages coming from 3 Now, when a messages leaves 1: * it reaches 2 and stops there * it reaches 4 * it reaches 3 through 4 and stops there Now, when a messages leaves 2: * it reaches 1 and stops there * it reaches 3 * it reaches 4 through 3 and stops there Now, when a messages leaves 3: * it reaches 4 and stops there * it reaches 2 * it reaches 1 through 2 and stops there Now, when a messages leaves 4: * it reaches 3 and stops there * it reaches 1 * it reaches 2 through 1 and stops there The new test AMQPSquareMirroringTest.java is testing this exact setup. --- .../amqp/connect/AMQPBrokerConnection.java | 16 +- .../mirror/AMQPMirrorControllerSource.java | 41 +++ .../mirror/AMQPMirrorControllerTarget.java | 26 +- .../amqp/connect/mirror/AckManager.java | 11 +- .../connect/mirror/MirrorTransaction.java | 6 +- .../amqp/proton/AMQPConnectionContext.java | 14 +- .../AMQPMirrorBrokerConnectionElement.java | 11 + .../impl/FileConfigurationParser.java | 3 +- .../server/mirror/TargetMirrorController.java | 22 ++ .../schema/artemis-configuration.xsd | 8 + .../connect/AMQPMirrorConnectionTest.java | 183 +++++++++++- .../amqp/connect/AMQPSquareMirroringTest.java | 282 ++++++++++++++++++ 12 files changed, 599 insertions(+), 24 deletions(-) create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/TargetMirrorController.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSquareMirroringTest.java diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java index f47cf26dd36..a9f430af520 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.protocol.amqp.connect; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -430,16 +431,19 @@ private void doConnect() { final Queue queue = server.locateQueue(getMirrorSNF(replica)); final boolean coreTunnelingEnabled = isCoreMessageTunnelingEnabled(replica); - final Symbol[] desiredCapabilities; + ArrayList desiredCapabilitiesList = new ArrayList<>(); + desiredCapabilitiesList.add(AMQPMirrorControllerSource.MIRROR_CAPABILITY); if (coreTunnelingEnabled) { - desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY, - AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT}; - } else { - desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY}; + desiredCapabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT); + } + if (replica.getNoForward()) { + desiredCapabilitiesList.add(AMQPMirrorControllerSource.NO_FORWARD); } - final Symbol[] requiredOfferedCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY}; + final Symbol[] desiredCapabilities = (Symbol[]) desiredCapabilitiesList.toArray(new Symbol[]{}); + + final Symbol[] requiredOfferedCapabilities = replica.getNoForward() ? new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY, AMQPMirrorControllerSource.NO_FORWARD} : new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY}; connectSender(queue, queue.getName().toString(), diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java index 0d31363c6f1..5cdcfcffd43 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java @@ -89,9 +89,13 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im // Capabilities public static final Symbol MIRROR_CAPABILITY = Symbol.getSymbol("amq.mirror"); public static final Symbol QPID_DISPATCH_WAYPOINT_CAPABILITY = Symbol.valueOf("qd.waypoint"); + public static final Symbol NO_FORWARD = Symbol.getSymbol("amq.no.forward"); + public static final Symbol NO_FORWARD_SOURCE = Symbol.getSymbol("amq.no.forward.source"); public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = SimpleString.of(INTERNAL_ID.toString()); public static final SimpleString INTERNAL_BROKER_ID_EXTRA_PROPERTY = SimpleString.of(BROKER_ID.toString()); + public static final SimpleString INTERNAL_NO_FORWARD = SimpleString.of(NO_FORWARD.toString()); + public static final SimpleString INTERNAL_NO_FORWARD_SOURCE = SimpleString.of(NO_FORWARD_SOURCE.toString()); private static final ThreadLocal mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null)); @@ -230,12 +234,17 @@ public void addAddress(AddressInfo addressInfo) throws Exception { public void deleteAddress(AddressInfo addressInfo) throws Exception { logger.trace("{} deleteAddress {}", server, addressInfo); + if (isBlockedByNoForward()) { + return; + } + if (invalidTarget(getControllerInUse()) || addressInfo.isInternal()) { return; } if (ignoreAddress(addressInfo.getName())) { return; } + if (deleteQueues) { Message message = createMessage(addressInfo.getName(), null, DELETE_ADDRESS, null, addressInfo.toJSON()); routeMirrorCommand(server, message); @@ -246,6 +255,10 @@ public void deleteAddress(AddressInfo addressInfo) throws Exception { public void createQueue(QueueConfiguration queueConfiguration) throws Exception { logger.trace("{} createQueue {}", server, queueConfiguration); + if (isBlockedByNoForward()) { + return; + } + if (invalidTarget(getControllerInUse()) || queueConfiguration.isInternal()) { if (logger.isTraceEnabled()) { logger.trace("Rejecting ping pong on create {} as isInternal={} and mirror target = {}", queueConfiguration, queueConfiguration.isInternal(), getControllerInUse()); @@ -264,6 +277,7 @@ public void createQueue(QueueConfiguration queueConfiguration) throws Exception } return; } + if (addQueues) { Message message = createMessage(queueConfiguration.getAddress(), queueConfiguration.getName(), CREATE_QUEUE, null, queueConfiguration.toJSON()); routeMirrorCommand(server, message); @@ -276,6 +290,10 @@ public void deleteQueue(SimpleString address, SimpleString queue) throws Excepti logger.trace("{} deleteQueue {}/{}", server, address, queue); } + if (isBlockedByNoForward()) { + return; + } + if (invalidTarget(getControllerInUse())) { return; } @@ -310,6 +328,14 @@ private boolean invalidTarget(MirrorController controller) { return controller != null && sameNode(getRemoteMirrorId(), controller.getRemoteMirrorId()); } + private boolean isBlockedByNoForward() { + return getControllerInUse() != null && getControllerInUse().isNoForward(); + } + + private boolean isBlockedByNoForward(Message message) { + return isBlockedByNoForward() || Boolean.TRUE.equals(message.getBrokerProperty(INTERNAL_NO_FORWARD)); + } + private boolean ignoreAddress(SimpleString address) { if (address.startsWith(server.getConfiguration().getManagementAddress())) { return true; @@ -338,6 +364,11 @@ Message copyMessageForPaging(Message message) { public void sendMessage(Transaction tx, Message message, RoutingContext context) { SimpleString address = context.getAddress(message); + if (isBlockedByNoForward(message)) { + logger.trace("sendMessage::server {} is discarding the message because its source is setting a noForward policy", server); + return; + } + if (context.isInternal()) { logger.trace("sendMessage::server {} is discarding send to avoid sending to internal queue", server); return; @@ -353,6 +384,8 @@ public void sendMessage(Transaction tx, Message message, RoutingContext context) return; } + logger.trace("sendMessage::{} send message {}", server, message); + try { context.setReusable(false); @@ -543,6 +576,14 @@ public void preAcknowledge(final Transaction tx, final MessageReference ref, fin logger.trace("preAcknowledge::tx={}, ref={}, reason={}", tx, ref, reason); } + if (Boolean.TRUE.equals(ref.getMessage().getBooleanProperty(INTERNAL_NO_FORWARD))) { + SimpleString nfsource = (SimpleString) ref.getMessage().getBrokerProperty(INTERNAL_NO_FORWARD_SOURCE); + String remoteMirrorId = getRemoteMirrorId(); + if (!SimpleString.of(remoteMirrorId).equals(nfsource)) { + return; + } + } + MirrorController controllerInUse = getControllerInUse(); // Retried ACKs are not forwarded. diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java index 27177f6ab30..d121a86cf85 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java @@ -39,7 +39,7 @@ import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl; -import org.apache.activemq.artemis.core.server.mirror.MirrorController; +import org.apache.activemq.artemis.core.server.mirror.TargetMirrorController; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; @@ -53,6 +53,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreLargeMessageReader; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreMessageReader; +import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.protocol.amqp.proton.MessageReader; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver; import org.apache.activemq.artemis.utils.ByteUtil; @@ -77,8 +78,11 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_QUEUE; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY; +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.NO_FORWARD; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID; +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_NO_FORWARD; +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_NO_FORWARD_SOURCE; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY; @@ -86,20 +90,27 @@ import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_LARGE_MESSAGE_FORMAT; import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_MESSAGE_FORMAT; -public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implements MirrorController { +public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implements TargetMirrorController { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private static final ThreadLocal CONTROLLER_THREAD_LOCAL = new ThreadLocal<>(); + private static final ThreadLocal CONTROLLER_THREAD_LOCAL = new ThreadLocal<>(); - public static void setControllerInUse(MirrorController controller) { + public static void setControllerInUse(TargetMirrorController controller) { CONTROLLER_THREAD_LOCAL.set(controller); } - public static MirrorController getControllerInUse() { + public static TargetMirrorController getControllerInUse() { return CONTROLLER_THREAD_LOCAL.get(); } + private boolean noMessageForwarding = false; + + @Override + public boolean isNoForward() { + return noMessageForwarding; + } + /** * Objects of this class can be used by either transaction or by OperationContext. * It is important that when you're using the transactions you clear any references to @@ -248,6 +259,7 @@ public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI, this.configuration = server.getConfiguration(); this.referenceNodeStore = sessionSPI.getProtocolManager().getReferenceIDSupplier(); mirrorContext = protonSession.getSessionSPI().getSessionContext(); + this.noMessageForwarding = AmqpSupport.verifyDesiredCapability(receiver, NO_FORWARD); } @Override @@ -534,6 +546,10 @@ private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotat message.setBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY, internalID); message.setBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY, internalMirrorID); + if (noMessageForwarding) { + message.setBrokerProperty(INTERNAL_NO_FORWARD, true); + message.setBrokerProperty(INTERNAL_NO_FORWARD_SOURCE, getRemoteMirrorId()); + } if (internalAddress != null) { message.setAddress(internalAddress); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java index 0ef1a9b6497..b5138420e4d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java @@ -54,7 +54,7 @@ import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.AddressInfo; -import org.apache.activemq.artemis.core.server.mirror.MirrorController; +import org.apache.activemq.artemis.core.server.mirror.TargetMirrorController; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger; @@ -237,7 +237,7 @@ private boolean isEmpty(LongObjectHashMap> acksToRetry) { - MirrorController previousController = AMQPMirrorControllerTarget.getControllerInUse(); + TargetMirrorController previousController = AMQPMirrorControllerTarget.getControllerInUse(); logger.trace("retrying address {} on server {}", address, server); try { AMQPMirrorControllerTarget.setControllerInUse(disabledAckMirrorController); @@ -518,7 +518,7 @@ private void deliveryAsync(JournalHashMap map) { - private static class DisabledAckMirrorController implements MirrorController { + private static class DisabledAckMirrorController implements TargetMirrorController { @Override public boolean isRetryACK() { @@ -564,5 +564,10 @@ public void preAcknowledge(Transaction tx, MessageReference ref, AckReason reaso public String getRemoteMirrorId() { return null; } + + @Override + public boolean isNoForward() { + return false; + } } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorTransaction.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorTransaction.java index 114cf9ad6f8..d825beb648c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorTransaction.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorTransaction.java @@ -20,7 +20,7 @@ import org.apache.activemq.artemis.core.io.OperationConsistencyLevel; import org.apache.activemq.artemis.core.persistence.StorageManager; -import org.apache.activemq.artemis.core.server.mirror.MirrorController; +import org.apache.activemq.artemis.core.server.mirror.TargetMirrorController; import org.apache.activemq.artemis.core.transaction.TransactionOperation; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.slf4j.Logger; @@ -34,7 +34,7 @@ public class MirrorTransaction extends TransactionImpl { boolean allowPageTransaction; - MirrorController controlInUse; + TargetMirrorController controlInUse; public MirrorTransaction(StorageManager storageManager) { super(storageManager); @@ -44,7 +44,7 @@ public MirrorTransaction(StorageManager storageManager) { @Override protected synchronized void afterCommit(List operationsToComplete) { - MirrorController beforeController = AMQPMirrorControllerTarget.getControllerInUse(); + TargetMirrorController beforeController = AMQPMirrorControllerTarget.getControllerInUse(); AMQPMirrorControllerTarget.setControllerInUse(controlInUse); try { super.afterCommit(operationsToComplete); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java index 17b54cf15b2..a7a93cd2f94 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton; import java.net.URI; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -458,14 +459,19 @@ private void handleReplicaTargetLinkOpened(AMQPSessionContext protonSession, Rec return; } + ArrayList offeredCapabilitiesList = new ArrayList<>(); + offeredCapabilitiesList.add(AMQPMirrorControllerSource.MIRROR_CAPABILITY); // We need to check if the remote desires to send us tunneled core messages or not, and if // we support that we need to offer that back so it knows it can actually do core tunneling. if (verifyDesiredCapability(receiver, AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT)) { - receiver.setOfferedCapabilities(new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY, - AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT}); - } else { - receiver.setOfferedCapabilities(new Symbol[]{AMQPMirrorControllerSource.MIRROR_CAPABILITY}); + offeredCapabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT); + } + + // If the remote wants us to not forward any messages to other mirrors we need to offer that capability + if (verifyDesiredCapability(receiver, AMQPMirrorControllerSource.NO_FORWARD)) { + offeredCapabilitiesList.add(AMQPMirrorControllerSource.NO_FORWARD); } + receiver.setOfferedCapabilities((Symbol[]) offeredCapabilitiesList.toArray(new Symbol[]{})); protonSession.addReplicaTarget(receiver); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java index 944099c2a00..daf1be233ed 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java @@ -30,6 +30,8 @@ public class AMQPMirrorBrokerConnectionElement extends AMQPBrokerConnectionEleme boolean queueCreation = true; + boolean noForward = false; + boolean queueRemoval = true; boolean messageAcknowledgements = true; @@ -75,6 +77,15 @@ public AMQPMirrorBrokerConnectionElement setQueueCreation(boolean queueCreation) return this; } + public boolean getNoForward() { + return noForward; + } + + public AMQPMirrorBrokerConnectionElement setNoForward(boolean noForward) { + this.noForward = noForward; + return this; + } + public boolean isQueueRemoval() { return queueRemoval; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 6819b307ade..ea9656b399c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -2194,10 +2194,11 @@ private void parseAMQPBrokerConnections(final Element e, boolean durable = getBooleanAttribute(e2, "durable", true); boolean queueRemoval = getBooleanAttribute(e2, "queue-removal", true); boolean sync = getBooleanAttribute(e2, "sync", false); + boolean noForward = !getBooleanAttribute(e2, "no-forward", false); String addressFilter = getAttributeValue(e2, "address-filter"); AMQPMirrorBrokerConnectionElement amqpMirrorConnectionElement = new AMQPMirrorBrokerConnectionElement(); - amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable).setAddressFilter(addressFilter).setSync(sync); + amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable).setAddressFilter(addressFilter).setSync(sync).setNoForward(noForward); connectionElement = amqpMirrorConnectionElement; connectionElement.setType(AMQPBrokerConnectionAddressType.MIRROR); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/TargetMirrorController.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/TargetMirrorController.java new file mode 100644 index 00000000000..bbddf19571c --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/TargetMirrorController.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.mirror; + +public interface TargetMirrorController extends MirrorController { + boolean isNoForward(); +} diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index afd809ad1a9..4d0e01c5994 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -2306,6 +2306,14 @@ + + + + If this is true, the mirror at the opposite end of the link will not forward messages or instructions coming from this broker to any other mirrors down the line. + This is false by default. + + + diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java index 9796fe00417..2a2a920477f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java @@ -20,6 +20,7 @@ import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_MESSAGE_FORMAT; import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.CONNECTION_FORCED; import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.TUNNEL_CORE_MESSAGES; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.fail; @@ -27,6 +28,7 @@ import java.lang.invoke.MethodHandles; import java.net.URI; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -43,11 +45,14 @@ import javax.jms.Topic; import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory; import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; @@ -186,6 +191,43 @@ public void testBrokerHandlesSenderLinkOmitsMirrorCapability() throws Exception } } + @Test + @Timeout(20) + public void testBrokerHandlesSenderLinkOmitsNoForwardCapability() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS"); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender() + .withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR")) + .withDesiredCapabilities("amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString(), AMQPMirrorControllerSource.NO_FORWARD.toString()) + .respond() + .withOfferedCapabilities("amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()); + peer.expectClose().withError(CONNECTION_FORCED.toString()).optional(); // Can hit the wire in rare instances. + peer.expectConnectionToDrop(); + peer.start(); + + final URI remoteURI = peer.getServerURI(); + logger.info("Connect test started, peer listening on: {}", remoteURI); + + try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) { + // No user or pass given, it will have to select ANONYMOUS even though PLAIN also offered + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setNoForward(true)); + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); + + Wait.assertTrue(() -> loggerHandler.findText("AMQ111001")); + assertEquals(1, loggerHandler.countText("AMQ119018")); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + server.stop(); + } + } + } + @Test @Timeout(20) public void testBrokerAddsAddressAndQueue() throws Exception { @@ -229,6 +271,121 @@ public void testBrokerAddsAddressAndQueue() throws Exception { } } + + @Test + @Timeout(20) + public void testNoForwardBlocksMessagesAndControlsPropagation() throws Exception { + final Map brokerProperties = new HashMap<>(); + brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker"); + + // Topology of the test: server -(noForward)-> server_2 -> peer_3 + try (ProtonTestServer peer_3 = new ProtonTestServer()) { + peer_3.expectSASLPlainConnect("user", "pass", "PLAIN", "ANONYMOUS"); + peer_3.expectOpen().respond(); + peer_3.expectBegin().respond(); + peer_3.expectAttach().ofSender() + .withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR")) + .withDesiredCapabilities("amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()) + .respond() + .withOfferedCapabilities("amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()) + .withPropertiesMap(brokerProperties); + peer_3.remoteFlow().withLinkCredit(10).queue(); + peer_3.start(); + + final URI remoteURI = peer_3.getServerURI(); + logger.info("Connect test started, peer listening on: {}", remoteURI); + + final int AMQP_PORT_2 = BROKER_PORT_NUM + 1; + final ActiveMQServer server_2 = createServer(AMQP_PORT_2, false); + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toPeer3", "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.setUser("user"); + amqpConnection.setPassword("pass"); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setQueueCreation(true).setAddressFilter(getQueueName() + ",sometest")); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer", "tcp://localhost:" + BROKER_PORT_NUM); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName() + ",sometest")); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + } + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer2", "tcp://localhost:" + AMQP_PORT_2); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setQueueCreation(true).setAddressFilter(getQueueName()).setNoForward(true)); + server.getConfiguration().addAMQPConnection(amqpConnection); + } + + // connect the topology + server_2.start(); + peer_3.waitForScriptToComplete(5, TimeUnit.SECONDS); + server.start(); + + // Create queues & send messages on server, nothing will reach peer_3 + createAddressAndQueues(server); + Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null); + + final org.apache.activemq.artemis.core.server.Queue q1 = server.locateQueue(getQueueName()); + assertNotNull(q1); + + final org.apache.activemq.artemis.core.server.Queue q2 = server_2.locateQueue(getQueueName()); + assertNotNull(q2); + + final ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + BROKER_PORT_NUM); + try (Connection conn = factory.createConnection()) { + final Session session = conn.createSession(); + conn.start(); + + final MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + producer.send(session.createTextMessage("message")); + producer.close(); + + org.apache.activemq.artemis.utils.Wait.assertEquals(1L, q1::getMessageCount, 100, 100); + org.apache.activemq.artemis.utils.Wait.assertEquals(1L, q2::getMessageCount, 100, 100); + + final MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + TextMessage message = (TextMessage) consumer.receive(100); + consumer.close(); + + org.apache.activemq.artemis.utils.Wait.assertEquals(0L, q2::getMessageCount, 100, 100); + org.apache.activemq.artemis.utils.Wait.assertEquals(0L, q1::getMessageCount, 100, 100); + + assertNotNull(message); + assertEquals("message", message.getText()); + } + + // give some time to peer_3 to receive messages (if any) + Thread.sleep(100); + peer_3.waitForScriptToComplete(5, TimeUnit.SECONDS); // if messages are received here, this should error out + + // Then send messages on the broker directly connected to the peer, the messages should make it to the peer. + // Receiving these 3 messages in that order confirms that no previous data reched the Peer, therefore validating + // the test. + peer_3.expectTransfer().accept(); // Address create + peer_3.expectTransfer().accept(); // Queue create + peer_3.expectTransfer().withMessageFormat(AMQP_TUNNELED_CORE_MESSAGE_FORMAT).accept(); // Producer Message + + server_2.addAddressInfo(new AddressInfo(SimpleString.of("sometest"), RoutingType.ANYCAST)); + server_2.createQueue(QueueConfiguration.of("sometest").setRoutingType(RoutingType.ANYCAST)); + + final ConnectionFactory factory_2 = CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + AMQP_PORT_2); + try (Connection connection = factory_2.createConnection()) { + final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + final MessageProducer producer = session.createProducer(session.createQueue("sometest")); + final TextMessage message = session.createTextMessage("test"); + + connection.start(); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + producer.send(message); + } + + peer_3.waitForScriptToComplete(5, TimeUnit.SECONDS); + + server.stop(); + server_2.stop(); + } + } + @Test @Timeout(20) public void testCreateDurableConsumerReplicatesAddressAndQueue() throws Exception { @@ -349,20 +506,41 @@ public void testProducerMessageIsMirroredWithoutCoreTunnelingUsesDefaultMessageF doTestProducerMessageIsMirroredWithCorrectMessageFormat(false); } + @Test + @Timeout(20) + public void testProducerMessageIsMirroredWithNoForwardAndTunneling() throws Exception { + doTestProducerMessageIsMirroredWithCorrectMessageFormat(true, true); + } + + @Test + @Timeout(20) + public void testProducerMessageIsMirroredWithNoForwardAndTunelingAndWithoutTunneling() throws Exception { + doTestProducerMessageIsMirroredWithCorrectMessageFormat(false, true); + } + private void doTestProducerMessageIsMirroredWithCorrectMessageFormat(boolean tunneling) throws Exception { + doTestProducerMessageIsMirroredWithCorrectMessageFormat(tunneling, false); + } + + private void doTestProducerMessageIsMirroredWithCorrectMessageFormat(boolean tunneling, boolean noForward) throws Exception { final Map brokerProperties = new HashMap<>(); brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker"); final String[] capabilities; + ArrayList capabilitiesList = new ArrayList<>(); final int messageFormat; + capabilitiesList.add("amq.mirror"); if (tunneling) { - capabilities = new String[] {"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}; + capabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()); messageFormat = AMQP_TUNNELED_CORE_MESSAGE_FORMAT; } else { - capabilities = new String[] {"amq.mirror"}; messageFormat = 0; // AMQP default } + if (noForward) { + capabilitiesList.add(AMQPMirrorControllerSource.NO_FORWARD.toString()); + } + capabilities = capabilitiesList.toArray(new String[]{}); try (ProtonTestServer peer = new ProtonTestServer()) { peer.expectSASLPlainConnect("user", "pass", "PLAIN", "ANONYMOUS"); @@ -387,6 +565,7 @@ private void doTestProducerMessageIsMirroredWithCorrectMessageFormat(boolean tun AMQPMirrorBrokerConnectionElement mirrorElement = new AMQPMirrorBrokerConnectionElement(); mirrorElement.addProperty(TUNNEL_CORE_MESSAGES, Boolean.toString(tunneling)); mirrorElement.setQueueCreation(true); + mirrorElement.setNoForward(noForward); AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSquareMirroringTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSquareMirroringTest.java new file mode 100644 index 00000000000..e1d4d034d89 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSquareMirroringTest.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp.connect; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class AMQPSquareMirroringTest extends AmqpClientTestSupport { + + protected static final int AMQP_PORT_2 = 5673; + protected static final int AMQP_PORT_3 = 5674; + protected static final int AMQP_PORT_4 = 5675; + + ActiveMQServer server_2; + ActiveMQServer server_3; + ActiveMQServer server_4; + + @Override + protected ActiveMQServer createServer() throws Exception { + return createServer(AMQP_PORT, false); + } + + protected String getConfiguredProtocols() { + return "AMQP,CORE,OPENWIRE"; + } + + @Test + public void testSquare() throws Exception { + server_2 = createServer(AMQP_PORT_2, false); + server_3 = createServer(AMQP_PORT_3, false); + server_4 = createServer(AMQP_PORT_4, false); + + // name the servers, for convenience during debugging + server.getConfiguration().setName("1"); + server_2.getConfiguration().setName("2"); + server_3.getConfiguration().setName("3"); + server_4.getConfiguration().setName("4"); + + /** + * + * Setup the mirroring topology to be a square: + * + * 1 <- - -> 2 + * ^ ^ The link between 1 and 2 and the + * | | link between 3 and 4 are noForward + * v v links in both directions. + * 4 <- - -> 3 + */ + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "1to2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setNoForward(true)); + server.getConfiguration().addAMQPConnection(amqpConnection); + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "1to4", "tcp://localhost:" + AMQP_PORT_4).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); + server.getConfiguration().addAMQPConnection(amqpConnection); + } + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "2to1", "tcp://localhost:" + AMQP_PORT).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setNoForward(true)); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "2to3", "tcp://localhost:" + AMQP_PORT_3).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + } + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "3to2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); + server_3.getConfiguration().addAMQPConnection(amqpConnection); + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "3to4", "tcp://localhost:" + AMQP_PORT_4).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setNoForward(true)); + server_3.getConfiguration().addAMQPConnection(amqpConnection); + } + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "4to1", "tcp://localhost:" + AMQP_PORT).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); + server_4.getConfiguration().addAMQPConnection(amqpConnection); + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "4to3", "tcp://localhost:" + AMQP_PORT_3).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setNoForward(true)); + server_4.getConfiguration().addAMQPConnection(amqpConnection); + } + + server.start(); + server_2.start(); + server_3.start(); + server_4.start(); + + createAddressAndQueues(server); + Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server_3.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server_4.locateQueue(getQueueName()) != null); + + Queue q1 = server.locateQueue(getQueueName()); + assertNotNull(q1); + + Queue q2 = server.locateQueue(getQueueName()); + assertNotNull(q2); + + Queue q3 = server.locateQueue(getQueueName()); + assertNotNull(q3); + + Queue q4 = server.locateQueue(getQueueName()); + assertNotNull(q4); + + ConnectionFactory factory = CFUtil.createConnectionFactory(randomProtocol(), "tcp://localhost:" + AMQP_PORT); + ConnectionFactory factory2 = CFUtil.createConnectionFactory(randomProtocol(), "tcp://localhost:" + AMQP_PORT_2); + ConnectionFactory factory3 = CFUtil.createConnectionFactory(randomProtocol(), "tcp://localhost:" + AMQP_PORT_3); + ConnectionFactory factory4 = CFUtil.createConnectionFactory(randomProtocol(), "tcp://localhost:" + AMQP_PORT_4); + + try (Connection conn = factory4.createConnection()) { + Session session = conn.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage("message " + i)); + } + } + try (Connection conn = factory3.createConnection()) { + Session session = conn.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + for (int i = 10; i < 20; i++) { + producer.send(session.createTextMessage("message " + i)); + } + } + try (Connection conn = factory2.createConnection()) { + Session session = conn.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + for (int i = 20; i < 30; i++) { + producer.send(session.createTextMessage("message " + i)); + } + } + try (Connection conn = factory.createConnection()) { + Session session = conn.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + for (int i = 30; i < 40; i++) { + producer.send(session.createTextMessage("message " + i)); + } + } + + Thread.sleep(100); // some time to allow eventual loops + + Wait.assertEquals(40L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(40L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(40L, q3::getMessageCount, 1000, 100); + Wait.assertEquals(40L, q4::getMessageCount, 1000, 100); + + try (Connection conn = factory.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + consumer.close(); + } + + Wait.assertEquals(30L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(30L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(30L, q3::getMessageCount, 1000, 100); + Wait.assertEquals(30L, q4::getMessageCount, 1000, 100); + + try (Connection conn = factory2.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 10; i < 20; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + consumer.close(); + } + + Wait.assertEquals(20L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(20L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(20L, q3::getMessageCount, 1000, 100); + Wait.assertEquals(20L, q4::getMessageCount, 1000, 100); + + try (Connection conn = factory3.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 20; i < 30; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + consumer.close(); + } + + Wait.assertEquals(10L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q3::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q4::getMessageCount, 1000, 100); + + try (Connection conn = factory4.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 30; i < 40; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + consumer.close(); + } + + Wait.assertEquals(0L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q3::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q4::getMessageCount, 1000, 100); + + try (Connection conn = factory.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + assertNull(consumer.receiveNoWait()); + consumer.close(); + } + + try (Connection conn = factory2.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + assertNull(consumer.receiveNoWait()); + consumer.close(); + } + + try (Connection conn = factory3.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + assertNull(consumer.receiveNoWait()); + consumer.close(); + } + + try (Connection conn = factory4.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + assertNull(consumer.receiveNoWait()); + consumer.close(); + } + + } +} \ No newline at end of file