Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions broker/src/main/java/org/assimbly/broker/Broker.java
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,14 @@ public interface Broker {
*/
public String countMessagesFromList(String endpointNames) throws Exception;

/**
* list of number of messages for each flow
*
* @param excludeEmptyQueues (exclude empty queues from the response)
* @return list of flows and how many messages.
* @throws Exception if list can't be retrieved
*/
public String getFlowMessageCountsList(boolean excludeEmptyQueues) throws Exception;

/**
* list all messages on the broker
Expand Down
58 changes: 51 additions & 7 deletions broker/src/main/java/org/assimbly/broker/impl/ActiveMQArtemis.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,27 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static java.util.Arrays.stream;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.commons.io.FileUtils;
import org.assimbly.broker.Broker;
import org.assimbly.broker.converter.CompositeDataConverter;
Expand All @@ -40,10 +41,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.management.JMX;
import javax.management.MBeanNotificationInfo;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.*;
import javax.management.openmbean.CompositeData;

public class ActiveMQArtemis implements Broker {
Expand Down Expand Up @@ -606,6 +604,14 @@ public String countMessages(String endpointName) throws Exception {

}

public String getFlowMessageCountsList(boolean excludeEmptyQueues) throws Exception {

Map<String, Long> flowIdsMessageCountMap = getFlowIdsMessageCountMap(excludeEmptyQueues);

ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.writeValueAsString(flowIdsMessageCountMap);
}



public String browseMessage(String endpointName, String messageId, boolean excludeBody) throws Exception {
Expand Down Expand Up @@ -658,6 +664,44 @@ public String browseMessages(String endpointName, Integer page, Integer numberOf

}

private Map<String, Long> getFlowIdsMessageCountMap(boolean excludeEmptyQueues) throws MalformedObjectNameException {
Map<String, Long> destinationMessageCounts = new HashMap<>();

try {
ActiveMQServer activeBroker = broker.getActiveMQServer();
// Get all queues names
String[] queueNames = activeBroker.getActiveMQServerControl().getQueueNames();

for (String queueName : queueNames) {
if(!queueName.startsWith("ID_")) {
// discard queues without prefix ID_
continue;
}

// extract flowId
String flowId = queueName.substring(0, Math.min(queueName.length(), 27));
QueueControl queueControl = (QueueControl) activeBroker.getManagementService().getResource(ResourceNames.QUEUE + queueName);

// Get the message count for the current queue
long messageCount = queueControl.getMessageCount();

if(destinationMessageCounts.containsKey(flowId)) {
messageCount += destinationMessageCounts.get(flowId);
}

if(messageCount > 0 || !excludeEmptyQueues) {
// Add queue name and message count to the map
destinationMessageCounts.put(flowId, messageCount);
}
}

} catch (Exception e) {
log.error("Error to get all destinations and messages counts", e);
}

return destinationMessageCounts;
}

public String sendMessage(String endpointName, Map<String,Object> messageHeaders, String messageBody) throws Exception {

checkIfEndpointExist(endpointName);
Expand Down
55 changes: 51 additions & 4 deletions broker/src/main/java/org/assimbly/broker/impl/ActiveMQClassic.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.net.UrlEscapers;
import org.apache.activemq.broker.*;
import org.apache.activemq.broker.jmx.*;
Expand Down Expand Up @@ -570,6 +568,14 @@ public String countMessages(String endpointName) throws Exception {

}

public String getFlowMessageCountsList(boolean excludeEmptyQueues) throws Exception {

Map<String, Long> flowIdsMessageCountMap = getFlowIdsMessageCountMap(endpointType, excludeEmptyQueues);

ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.writeValueAsString(flowIdsMessageCountMap);
}

public String sendMessage(String endpointName, Map<String,Object> messageHeaders, String messageBody) throws Exception {

checkIfEndpointExist(endpointName);
Expand Down Expand Up @@ -717,6 +723,47 @@ public DestinationViewMBean getDestinationViewMBean(String destinationType, Stri
return destinationViewMbean;
}

private Map<String, Long> getFlowIdsMessageCountMap(String destinationType, boolean excludeEmptyQueues) throws MalformedObjectNameException {
Map<String, Long> destinationMessageCounts = new HashMap<>();

try {
// Get all destinations
Set<ObjectName> destinations = broker.getManagementContext().queryNames(new ObjectName("org.apache.activemq:type=Broker,brokerName=" + broker.getBrokerName() + ",destinationType=" + destinationType + ",*"), null);

// Iterate over each destination
for (ObjectName destination : destinations) {
String destinationName = destination.getKeyProperty("destinationName");

if(!destinationName.startsWith("ID_")) {
// discard destination without prefix ID_
continue;
}

// extract flowId
String flowId = destinationName.substring(0, Math.min(destinationName.length(), 27));

// Get the DestinationViewMBean for the current destination
DestinationViewMBean destinationViewMBean = getQueueViewMBean(destinationType, destinationName);

// Get the message count for the current destination
long messageCount = destinationViewMBean.getQueueSize();

if(destinationMessageCounts.containsKey(flowId)) {
messageCount += destinationMessageCounts.get(flowId);
}

if(messageCount > 0 || !excludeEmptyQueues) {
// Add the destination name and message count to the map
destinationMessageCounts.put(flowId, messageCount);
}
}
} catch (Exception e) {
log.error("Error to get all destinations and messages counts", e);
}

return destinationMessageCounts;
}

public QueueViewMBean getQueueViewMBean(String destinationType, String destinationName) throws MalformedObjectNameException {

ObjectName activeMQ = new ObjectName("org.apache.activemq:type=Broker,brokerName=" + broker.getBrokerName() + ",destinationType=" + destinationType + ",destinationName=" + destinationName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,14 @@ public String countMessagesFromList(String brokerType, String endpointNames) thr
return result;
}

public String getFlowMessageCountsList(String brokerType, boolean excludeEmptyQueues) throws Exception {

broker = getBroker(brokerType);
result = broker.getFlowMessageCountsList(excludeEmptyQueues);

return result;
}

public String countMessages(String brokerType, String endpointName) throws Exception {

broker = getBroker(brokerType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,35 @@ public Object countMessagesFromList(@Parameter(hidden = true) @RequestHeader("Ac

}

/**
* GET /brokers/{brokerType}/flows/messages/count : get a list of number of messages for each flow.
*
* @param brokerType, the type of broker: classic or artemis
* @return list of flows with status 200 (OK) or with status 404 (Not Found)
*/
@GetMapping(
path = "/brokers/{brokerType}/flows/message/count",
produces = {MediaType.APPLICATION_JSON_VALUE, MediaType.APPLICATION_XML_VALUE, MediaType.TEXT_PLAIN_VALUE}
)
public Object getFlowsMessageCountList(
@Parameter(hidden = true) @RequestHeader("Accept") String mediaType,
@PathVariable String brokerType,
@RequestParam(value = "excludeEmptyQueues", required = false) boolean excludeEmptyQueues
) throws Exception {

log.debug("REST request to list number of messages for each flow");

try {
result = broker.getFlowMessageCountsList(brokerType, excludeEmptyQueues);

return org.assimbly.util.rest.ResponseUtil.createSuccessResponse(ID, mediaType, "/brokers/{brokerType}/flows/message/count", result);
} catch (Exception e) {
log.error("Can't list messages", e);
return org.assimbly.util.rest.ResponseUtil.createFailureResponse(ID, mediaType, "/brokers/{brokerType}/flows/message/count", e.getMessage());
}

}


/**
* GET /brokers/{brokerType}/messages/{endpointName}/{filter} : get list of messages on endpoint.
Expand Down