Skip to content

Commit

Permalink
ARTEMIS-5063 messageMoved addition in ActiveMQServerMessagePlugin
Browse files Browse the repository at this point in the history
  • Loading branch information
Jean-Pascal Briquet authored and clebertsuconic committed Sep 26, 2024
1 parent 0c4c054 commit 8fc6f09
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3665,6 +3665,10 @@ private RoutingStatus move(final Transaction originalTX,
tx.commit();
}

if (server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.messageMoved(tx, ref, reason, address, queueID, consumer, copyMessage, routingStatus));
}

return routingStatus;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,4 +290,28 @@ default void messageAcknowledged(Transaction tx, MessageReference ref, AckReason
//by default call the old method for backwards compatibility
this.messageAcknowledged(ref, reason, consumer);
}

/**
* A message has been moved
*
* @param tx The transaction associated with the move
* @param ref The ref of the message moved
* @param reason The move reason
* @param destAddress the destination address for the move operation
* @param destQueueID the destination queueID for the move operation - this field is optional and can be null
* @param consumer the consumer that moved the message - this field is optional and can be null
* @param newMessage the new message created by the move operation
* @param result routing status of the move operation
* @throws ActiveMQException
*/
default void messageMoved(final Transaction tx,
final MessageReference ref,
final AckReason reason,
final SimpleString destAddress,
final Long destQueueID,
final ServerConsumer consumer,
final Message newMessage,
final RoutingStatus result) throws ActiveMQException {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -96,6 +97,7 @@
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_UPDATE_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_MOVED;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down Expand Up @@ -412,6 +414,35 @@ public void testUpdateAddress() throws Exception {
verifier.validatePluginMethodsEquals(1, BEFORE_UPDATE_ADDRESS, AFTER_UPDATE_ADDRESS);
}

@Test
public void testMessageMoved() throws Exception {
final String queue1Name = "queue1";
final String queue2Name = "queue2";
createQueue(queue2Name);
org.apache.activemq.artemis.core.server.Queue artemisQueue = server.locateQueue(queue1Name);
org.apache.activemq.artemis.core.server.Queue artemisQueue2 = server.locateQueue(queue2Name);

conn = cf.createConnection();
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

MessageProducer prod = sess.createProducer(queue);

byte[] msgs = new byte[1024];
for (int i = 0; i < msgs.length; i++) {
msgs[i] = RandomUtil.randomByte();
}

TextMessage msg1 = sess.createTextMessage(new String(msgs));
prod.send(msg1);
conn.close();

artemisQueue.moveReferences(null, artemisQueue2.getAddress(), null);
Wait.assertEquals(1L, artemisQueue2::getMessageCount, 2000, 100);

verifier.validatePluginMethodsEquals(1, MESSAGE_MOVED);
}

private class AckPluginVerifier implements ActiveMQServerPlugin {

private BiConsumer<ServerConsumer, AckReason> assertion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
public static final String AFTER_REMOVE_BINDING = "afterRemoveBinding";
public static final String MESSAGE_EXPIRED = "messageExpired";
public static final String MESSAGE_ACKED = "messageAcknowledged";
public static final String MESSAGE_MOVED = "messageMoved";
public static final String BEFORE_SEND = "beforeSend";
public static final String AFTER_SEND = "afterSend";
public static final String ON_SEND_EXCEPTION = "onSendException";
Expand Down Expand Up @@ -299,6 +300,23 @@ public void messageAcknowledged(MessageReference ref, AckReason reason, ServerCo
methodCalled(MESSAGE_ACKED);
}

@Override
public void messageMoved(final Transaction tx,
final MessageReference ref,
final AckReason reason,
final SimpleString destAddress,
final Long destQueueID,
final ServerConsumer consumer,
final Message newMessage,
final RoutingStatus result) {
Objects.requireNonNull(ref);
Objects.requireNonNull(reason);
Objects.requireNonNull(destAddress);
Objects.requireNonNull(newMessage);
Objects.requireNonNull(result);
methodCalled(MESSAGE_MOVED);
}

@Override
public void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct,
boolean noAutoCreateQueue) {
Expand Down

0 comments on commit 8fc6f09

Please sign in to comment.