Skip to content

Reactive Messaging JMS connector support  #30943

Open

Description

Description

Hi everybody,

Related to the zulip topic, https://quarkusio.zulipchat.com/#narrow/stream/187030-users/topic/Artemis.20jms.20.2B.20smallrye-jms.20connector, we are looking for a smart way to talk with jboss eap6 by jms message (configurable protocol like hornetq core). Initial tests works by the old school way (create factory -> connexion -> session...). With the quarkus-artemis-jms extension, we were able to inject a ConnectionFactory, and produce/consume messages against our hornetq broker. It compile natively.

@Path("/activemqjms2")
@RegisterForReflection(targets = { HornetQClientProtocolManagerFactory.class, ActiveMQConnectionFactory.class })
public class ActiveMQJMS2Resource {

    private static final Logger log = LoggerFactory.getLogger(ActiveMQJMS2Resource.class);     
    
    @Inject ConnectionFactory factory;     

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public String connection() throws JMSException {
        
        Queue queue = ActiveMQDestination.createQueue(PacketImpl.OLD_QUEUE_PREFIX + QUEUE_NAME);
        
        try (Connection connection = factory.createConnection()) {
            int count = 0;
            connection.start();
            try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
                try (MessageProducer producer = session.createProducer(queue)) {
                    String text = "coucou at " + new Date();
                    for (int i = 0; i < 10; i++) {
                        producer.send(session.createTextMessage(text + " - " + i));
                    }
                }
                // session.commit();
            }
            log.info("sent 10 messages");
            try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
                try (MessageConsumer consumer = session.createConsumer(queue)) {
                    Message message = consumer.receive(2000);
                    while (message != null) {
                        if (message instanceof TextMessage textMessage) {
                            log.info("received " + textMessage.getText());
                        } else {
                            log.info("received " + message);
                        }
                        count++;
                        message = consumer.receive(2000);
                    }
                }
                // session.commit();
            }
            return "received messages: " + count;
        }
    }
}
<dependency>
    <groupId>io.quarkiverse.artemis</groupId>
    <artifactId>quarkus-artemis-jms</artifactId>
    <version>2.0.4</version>
</dependency>
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>artemis-hqclient-protocol</artifactId>
    <version>2.26.0</version>
</dependency>
quarkus:
  artemis:
    url: (tcp://jboss1:5049,tcp://jboss2:5049)?protocolManagerFactoryStr=org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory
    username: user
    password: pass

Although this is technically working, we would like an improved jms support, such as what is already provided by the smallrye jms connector.

here are the main requirements:

XA support is not a requirement.

here is what has been done on SB:

@Component
public class ValidationBrokerListener {     

    @JmsListener(
        destination = "com.x.jee.ValidationBroker.jms.Out.Queue",
        selector = "event = 'MyEvent' AND participant = 'X'",
        concurrency = "1-10")
    @Transactional
    public void processMessage(Message message) throws JMSException, InterruptedException {
        // ...
    }
}

or in the old days of EAP (nothing fancy):

@MessageDriven(name = "MyMDB", activationConfig = {
    @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
    @ActivationConfigProperty(propertyName = "maxSession", propertyValue = "${Messaging.maxSession:3}"),
    @ActivationConfigProperty(propertyName = "destination", propertyValue = "com/x/Messaging/MyQueue"),
    @ActivationConfigProperty(propertyName = "rebalanceConnections", propertyValue = "true") 
})
@ResourceAdapter("${MyRarName}")
public class MyMDB implements MessageListener {  
    public void onMessage(Message msg) {
        ...
    }
}

Many thanks.

Implementation ideas

No response

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions