From 75de9321162ae096de4a3c0b5a325865d514e5a0 Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon" Date: Thu, 15 Feb 2024 17:27:52 -0500 Subject: [PATCH] AMQ-9436 - Ensure message audit in queue store cursor is shared This commit fixes the initialization of the StoreQueueCursor message audit object to make sure it's shared between the persistent and non persistent cursors. It also adds a check to ensure that duplicate calls to start will not try and init more than once. --- .../broker/region/cursors/StoreQueueCursor.java | 8 ++++++-- .../region/QueueDuplicatesFromStoreTest.java | 15 +++++++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java index a7b4c6ec9f9..7e877fa2df5 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java @@ -53,7 +53,9 @@ public StoreQueueCursor(Broker broker,Queue queue) { @Override public synchronized void start() throws Exception { - started = true; + if (isStarted()) { + return; + } super.start(); if (nonPersistent == null) { if (broker.getBrokerService().isPersistent()) { @@ -76,7 +78,9 @@ public synchronized void start() throws Exception { @Override public synchronized void stop() throws Exception { - started = false; + if (!isStarted()) { + return; + } if (nonPersistent != null) { nonPersistent.destroy(); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java index 038bb4039d9..dd9bcc5aca0 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java @@ -29,10 +29,13 @@ import junit.framework.TestCase; +import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.SubscriptionStatistics; +import org.apache.activemq.broker.region.cursors.PendingMessageCursor; +import org.apache.activemq.broker.region.cursors.StoreQueueCursor; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; @@ -117,6 +120,18 @@ public void doTestNoDuplicateAfterCacheFullAndAcked(final int auditDepth) throws queue.initialize(); queue.start(); + // verify that the cursor message audit is created and set with the + // correct audit depth and shared with the persistent and non peristent + // cursors + final StoreQueueCursor messages = (StoreQueueCursor) queue.getMessages(); + ActiveMQMessageAudit messageAudit = messages.getMessageAudit(); + assertNotNull(messageAudit); + assertEquals(auditDepth, messageAudit.getAuditDepth()); + assertSame(messageAudit, messages.getPersistent().getMessageAudit()); + assertSame(messageAudit, messages.getNonPersistent().getMessageAudit()); + // Verify calling start again doesn't re-initial the audit + messages.start(); + assertSame(messageAudit, messages.getMessageAudit()); ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); ProducerInfo producerInfo = new ProducerInfo();