Description
openedon Feb 7, 2023
Description
Hi everybody,
Related to the zulip topic, https://quarkusio.zulipchat.com/#narrow/stream/187030-users/topic/Artemis.20jms.20.2B.20smallrye-jms.20connector, we are looking for a smart way to talk with jboss eap6 by jms message (configurable protocol like hornetq core). Initial tests works by the old school way (create factory -> connexion -> session...). With the quarkus-artemis-jms extension, we were able to inject a ConnectionFactory, and produce/consume messages against our hornetq broker. It compile natively.
@Path("/activemqjms2")
@RegisterForReflection(targets = { HornetQClientProtocolManagerFactory.class, ActiveMQConnectionFactory.class })
public class ActiveMQJMS2Resource {
private static final Logger log = LoggerFactory.getLogger(ActiveMQJMS2Resource.class);
@Inject ConnectionFactory factory;
@GET
@Produces(MediaType.TEXT_PLAIN)
public String connection() throws JMSException {
Queue queue = ActiveMQDestination.createQueue(PacketImpl.OLD_QUEUE_PREFIX + QUEUE_NAME);
try (Connection connection = factory.createConnection()) {
int count = 0;
connection.start();
try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
try (MessageProducer producer = session.createProducer(queue)) {
String text = "coucou at " + new Date();
for (int i = 0; i < 10; i++) {
producer.send(session.createTextMessage(text + " - " + i));
}
}
// session.commit();
}
log.info("sent 10 messages");
try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
try (MessageConsumer consumer = session.createConsumer(queue)) {
Message message = consumer.receive(2000);
while (message != null) {
if (message instanceof TextMessage textMessage) {
log.info("received " + textMessage.getText());
} else {
log.info("received " + message);
}
count++;
message = consumer.receive(2000);
}
}
// session.commit();
}
return "received messages: " + count;
}
}
}
<dependency>
<groupId>io.quarkiverse.artemis</groupId>
<artifactId>quarkus-artemis-jms</artifactId>
<version>2.0.4</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-hqclient-protocol</artifactId>
<version>2.26.0</version>
</dependency>
quarkus:
artemis:
url: (tcp://jboss1:5049,tcp://jboss2:5049)?protocolManagerFactoryStr=org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory
username: user
password: pass
Although this is technically working, we would like an improved jms support, such as what is already provided by the smallrye jms connector.
here are the main requirements:
- connection pooling and concurrency configuration (number of messages that can be processed in parallel)
- listener style consumer (as opposed to a timer based approach as in https://quarkus.io/guides/jms#the-price-consumer)
- authentication (that is already working)
- participation in the managed quarkus transaction (single resource manager) - may be it comes for free already
- selectors, with parameterisation if provided through annotation
- distributing consumers on both nodes of the targeted cluster : rebalanceConnections => see https://github.com/apache/activemq-artemis/blob/main/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivationSpec.java#L140 and https://www.wildfly.org/news/2017/10/03/Messaging-features/
XA support is not a requirement.
here is what has been done on SB:
@Component
public class ValidationBrokerListener {
@JmsListener(
destination = "com.x.jee.ValidationBroker.jms.Out.Queue",
selector = "event = 'MyEvent' AND participant = 'X'",
concurrency = "1-10")
@Transactional
public void processMessage(Message message) throws JMSException, InterruptedException {
// ...
}
}
or in the old days of EAP (nothing fancy):
@MessageDriven(name = "MyMDB", activationConfig = {
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "maxSession", propertyValue = "${Messaging.maxSession:3}"),
@ActivationConfigProperty(propertyName = "destination", propertyValue = "com/x/Messaging/MyQueue"),
@ActivationConfigProperty(propertyName = "rebalanceConnections", propertyValue = "true")
})
@ResourceAdapter("${MyRarName}")
public class MyMDB implements MessageListener {
public void onMessage(Message msg) {
...
}
}
Many thanks.
Implementation ideas
No response