Skip to content

Commit

Permalink
AMQ-9436 - Ensure message audit in queue store cursor is shared
Browse files Browse the repository at this point in the history
This commit fixes the initialization of the StoreQueueCursor message
audit object to make sure it's shared between the persistent and non
persistent cursors. It also adds a check to ensure that duplicate calls
to start will not try and init more than once.
  • Loading branch information
cshannon committed Feb 15, 2024
1 parent 30d54c4 commit 75de932
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ public StoreQueueCursor(Broker broker,Queue queue) {

@Override
public synchronized void start() throws Exception {
started = true;
if (isStarted()) {
return;
}
super.start();
if (nonPersistent == null) {
if (broker.getBrokerService().isPersistent()) {
Expand All @@ -76,7 +78,9 @@ public synchronized void start() throws Exception {

@Override
public synchronized void stop() throws Exception {
started = false;
if (!isStarted()) {
return;
}
if (nonPersistent != null) {
nonPersistent.destroy();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@

import junit.framework.TestCase;

import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.SubscriptionStatistics;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
Expand Down Expand Up @@ -117,6 +120,18 @@ public void doTestNoDuplicateAfterCacheFullAndAcked(final int auditDepth) throws
queue.initialize();
queue.start();

// verify that the cursor message audit is created and set with the
// correct audit depth and shared with the persistent and non peristent
// cursors
final StoreQueueCursor messages = (StoreQueueCursor) queue.getMessages();
ActiveMQMessageAudit messageAudit = messages.getMessageAudit();
assertNotNull(messageAudit);
assertEquals(auditDepth, messageAudit.getAuditDepth());
assertSame(messageAudit, messages.getPersistent().getMessageAudit());
assertSame(messageAudit, messages.getNonPersistent().getMessageAudit());
// Verify calling start again doesn't re-initial the audit
messages.start();
assertSame(messageAudit, messages.getMessageAudit());

ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
ProducerInfo producerInfo = new ProducerInfo();
Expand Down

0 comments on commit 75de932

Please sign in to comment.