Skip to content

Commit

Permalink
ARTEMIS-5037: option to limit mirror propagation
Browse files Browse the repository at this point in the history
Add a new option in the Mirror settings to prevent a broker from
propagating messages.

When working with a topology where 4 nodes are forming a square and
where each node in that square mirrors its two neighbors: a message
leaving a corner can reach the opposite corner of the square by two
different routes. This is causing the message ordering to get broken.

example:
1 <-> 2
^     ^
|     |
v     v
4 <-> 3

A message from 1 will reach 3 by 2 and 4. Message duplication checks
will prevent the message from being duplicated but won't help regarding
the order of the messages. This is because a either the route by 2 or 4
can be faster than the other, so whomever wins the race sets the message
first.

Fixing the example:
Using the new option to not forward messages coming from a link, we
break the possibilities to have two routes to reach the opposite corner.

The above example is updated as followed:
* 2 never forwards messages coming from 1
* 1 never forwards messages coming from 2
* 3 never forwards messages coming from 4
* 4 never forwards messages coming from 3

Now, when a messages leaves 1:
* it reaches 2 and stops there
* it reaches 4
* it reaches 3 through 4 and stops there

Now, when a messages leaves 2:
* it reaches 1 and stops there
* it reaches 3
* it reaches 4 through 3 and stops there

Now, when a messages leaves 3:
* it reaches 4 and stops there
* it reaches 2
* it reaches 1 through 2 and stops there

Now, when a messages leaves 4:
* it reaches 3 and stops there
* it reaches 1
* it reaches 2 through 1 and stops there

The new test AMQPSquareMirroringTest.java is testing this exact setup.
  • Loading branch information
lavocatt committed Sep 23, 2024
1 parent 4a80671 commit 946cb19
Show file tree
Hide file tree
Showing 8 changed files with 350 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.activemq.artemis.protocol.amqp.connect;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -430,16 +431,19 @@ private void doConnect() {
final Queue queue = server.locateQueue(getMirrorSNF(replica));

final boolean coreTunnelingEnabled = isCoreMessageTunnelingEnabled(replica);
final Symbol[] desiredCapabilities;

ArrayList<Symbol> desiredCapabilitiesList = new ArrayList<>();
desiredCapabilitiesList.add(AMQPMirrorControllerSource.MIRROR_CAPABILITY);
if (coreTunnelingEnabled) {
desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY,
AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT};
} else {
desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};
desiredCapabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT);
}
if (!replica.getCanForwardMessages()) {
desiredCapabilitiesList.add(AMQPMirrorControllerSource.NO_FORWARD);
}

final Symbol[] requiredOfferedCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};
final Symbol[] desiredCapabilities = (Symbol[]) desiredCapabilitiesList.toArray(new Symbol[]{});

final Symbol[] requiredOfferedCapabilities = replica.getCanForwardMessages() ? new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY} : new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY, AMQPMirrorControllerSource.NO_FORWARD};

connectSender(queue,
queue.getName().toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,11 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
// Capabilities
public static final Symbol MIRROR_CAPABILITY = Symbol.getSymbol("amq.mirror");
public static final Symbol QPID_DISPATCH_WAYPOINT_CAPABILITY = Symbol.valueOf("qd.waypoint");
public static final Symbol NO_FORWARD = Symbol.getSymbol("amq.no.forward");

public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = SimpleString.of(INTERNAL_ID.toString());
public static final SimpleString INTERNAL_BROKER_ID_EXTRA_PROPERTY = SimpleString.of(BROKER_ID.toString());
public static final SimpleString INTERNAL_NO_FORWARD = SimpleString.of(NO_FORWARD.toString());

private static final ThreadLocal<RoutingContext> mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null));

Expand Down Expand Up @@ -301,6 +303,10 @@ private boolean invalidTarget(MirrorController controller) {
return controller != null && sameNode(getRemoteMirrorId(), controller.getRemoteMirrorId());
}

private boolean isBlockedByNoForward(Message message) {
return Boolean.TRUE.equals(message.getBrokerProperty(INTERNAL_NO_FORWARD));
}

private boolean ignoreAddress(SimpleString address) {
if (address.startsWith(server.getConfiguration().getManagementAddress())) {
return true;
Expand Down Expand Up @@ -344,6 +350,11 @@ public void sendMessage(Transaction tx, Message message, RoutingContext context)
return;
}

if (isBlockedByNoForward(message)) {
logger.trace("sendMessage::server {} is discarding the message because its source is setting a noForward policy", server);
return;
}

logger.trace("sendMessage::{} send message {}", server, message);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.pools.MpscPool;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
Expand All @@ -77,8 +78,10 @@
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_QUEUE;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.NO_FORWARD;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_NO_FORWARD;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
Expand All @@ -92,6 +95,8 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement

private static final ThreadLocal<MirrorController> CONTROLLER_THREAD_LOCAL = new ThreadLocal<>();

private boolean noMessageForwarding = false;

public static void setControllerInUse(MirrorController controller) {
CONTROLLER_THREAD_LOCAL.set(controller);
}
Expand Down Expand Up @@ -248,6 +253,13 @@ public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI,
this.configuration = server.getConfiguration();
this.referenceNodeStore = sessionSPI.getProtocolManager().getReferenceIDSupplier();
mirrorContext = protonSession.getSessionSPI().getSessionContext();
if (receiver.getRemoteDesiredCapabilities() != null) {
for (Symbol capability : receiver.getRemoteDesiredCapabilities()) {
if (capability == NO_FORWARD) {
this.noMessageForwarding = true;
}
}
}
}

@Override
Expand Down Expand Up @@ -534,6 +546,9 @@ private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotat

message.setBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY, internalID);
message.setBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY, internalMirrorID);
if (noMessageForwarding) {
message.setBrokerProperty(INTERNAL_NO_FORWARD, true);
}

if (internalAddress != null) {
message.setAddress(internalAddress);
Expand Down Expand Up @@ -590,4 +605,4 @@ public void sendMessage(Transaction tx, Message message, RoutingContext context)
// Do nothing
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.activemq.artemis.protocol.amqp.proton;

import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -458,14 +459,19 @@ private void handleReplicaTargetLinkOpened(AMQPSessionContext protonSession, Rec
return;
}

ArrayList<Symbol> offeredCapabilitiesList = new ArrayList<>();
offeredCapabilitiesList.add(AMQPMirrorControllerSource.MIRROR_CAPABILITY);
// We need to check if the remote desires to send us tunneled core messages or not, and if
// we support that we need to offer that back so it knows it can actually do core tunneling.
if (verifyDesiredCapability(receiver, AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT)) {
receiver.setOfferedCapabilities(new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY,
AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT});
} else {
receiver.setOfferedCapabilities(new Symbol[]{AMQPMirrorControllerSource.MIRROR_CAPABILITY});
offeredCapabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT);
}

// If the remote wants us to not forward any messages to other mirrors we need to offer that capability
if (verifyDesiredCapability(receiver, AMQPMirrorControllerSource.NO_FORWARD)){
offeredCapabilitiesList.add(AMQPMirrorControllerSource.NO_FORWARD);
}
receiver.setOfferedCapabilities((Symbol[]) offeredCapabilitiesList.toArray(new Symbol[]{}));

protonSession.addReplicaTarget(receiver);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class AMQPMirrorBrokerConnectionElement extends AMQPBrokerConnectionEleme

boolean queueCreation = true;

boolean canForwardMessages = true;

boolean queueRemoval = true;

boolean messageAcknowledgements = true;
Expand Down Expand Up @@ -75,6 +77,15 @@ public AMQPMirrorBrokerConnectionElement setQueueCreation(boolean queueCreation)
return this;
}

public boolean getCanForwardMessages() {
return canForwardMessages;
}

public AMQPMirrorBrokerConnectionElement setCanForwardMessages(boolean canForwardMessages) {
this.canForwardMessages = canForwardMessages;
return this;
}

public boolean isQueueRemoval() {
return queueRemoval;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2191,10 +2191,11 @@ private void parseAMQPBrokerConnections(final Element e,
boolean durable = getBooleanAttribute(e2, "durable", true);
boolean queueRemoval = getBooleanAttribute(e2, "queue-removal", true);
boolean sync = getBooleanAttribute(e2, "sync", false);
boolean canForwardMessages = !getBooleanAttribute(e2, "no-messages-forwarding", false);
String addressFilter = getAttributeValue(e2, "address-filter");

AMQPMirrorBrokerConnectionElement amqpMirrorConnectionElement = new AMQPMirrorBrokerConnectionElement();
amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable).setAddressFilter(addressFilter).setSync(sync);
amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable).setAddressFilter(addressFilter).setSync(sync).setCanForwardMessages(canForwardMessages);
connectionElement = amqpMirrorConnectionElement;
connectionElement.setType(AMQPBrokerConnectionAddressType.MIRROR);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2298,6 +2298,14 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="no-message-forwarding" type="xsd:boolean" use="optional" default="false">
<xsd:annotation>
<xsd:documentation>
If this is true, the mirror at the opposite end of the link will not forward messages coming from that link to any other mirrors down the line.
This is false by default.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="address-filter" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation>
Expand Down
Loading

0 comments on commit 946cb19

Please sign in to comment.