Skip to content

Commit

Permalink
[3.x] 6262 JMS JNDI destination support (helidon-io#6301)
Browse files Browse the repository at this point in the history
* 6262 JMS JNDI destination support
  • Loading branch information
danielkec authored Mar 3, 2023
1 parent 22344a5 commit 9295062
Show file tree
Hide file tree
Showing 16 changed files with 264 additions and 41 deletions.
3 changes: 2 additions & 1 deletion docs/mp/reactivemessaging/jms.adoc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
///////////////////////////////////////////////////////////////////////////////

Copyright (c) 2020, 2022 Oracle and/or its affiliates.
Copyright (c) 2020, 2023 Oracle and/or its affiliates.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -76,6 +76,7 @@ Expression can only access headers and properties, not the payload.
|`session-group-id` | When multiple channels share same `session-group-id`,
they share same JMS session and same JDBC connection as well.
|`jndi.jms-factory` | JNDI name of JMS factory.
|`jndi.destination` | JNDI destination identifier.
|`jndi.env-properties` | Environment properties used for creating initial context `java.naming.factory.initial`, `java.naming.provider.url` ...
|`producer.someproperty` | property with producer prefix is set to producer instance (for example WLS Unit-of-Order `WLMessageProducer.setUnitOfOrder("unit-1")` can be configured as `producer.unit-of-order=unit-1`)
|===
Expand Down
4 changes: 3 additions & 1 deletion docs/mp/reactivemessaging/weblogic.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Connector name: `helidon-weblogic-jms`
|`credentials` | WebLogic initial context credential(password)
|`type` | Possible values are: `queue`, `topic`. Default value is: `topic`
|`destination` | Queue or topic name in WebLogic CDI Syntax(CDI stands for Create Destination Identifier)
|`jndi.destination` | JNDI destination identifier. When no such JNDI destination is found, falls back to `destination` with CDI syntax.
|`acknowledge-mode` |Possible values are: `AUTO_ACKNOWLEDGE`- session automatically acknowledges a client’s receipt of a message,
`CLIENT_ACKNOWLEDGE` - receipt of a message is acknowledged only when `Message.ack()` is called manually,
`DUPS_OK_ACKNOWLEDGE` - session lazily acknowledges the delivery of messages. Default value: `AUTO_ACKNOWLEDGE`
Expand Down Expand Up @@ -114,7 +115,8 @@ mp:
outgoing:
to-wls:
connector: helidon-weblogic-jms
destination: ./TestJMSModule!TestQueue
# JNDI identifier for the same queue
jndi.destination: jms/TestQueue
----
When configuring destination with WebLogic CDI, the following syntax needs to be applied:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021 Oracle and/or its affiliates.
* Copyright (c) 2020, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,8 @@

package io.helidon.examples.messaging.mp;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.SubmissionPublisher;

import io.helidon.common.reactive.Multi;
Expand Down Expand Up @@ -77,22 +79,26 @@ public ProcessorBuilder<String, Message<String>> multiply() {
* Broadcasts an event.
*
* @param msg Message to broadcast
* @return completed stage
*/
@Incoming("fromJms")
public void broadcast(JmsMessage<String> msg) {
public CompletionStage<Void> broadcast(JmsMessage<String> msg) {
// Broadcast to all subscribers
broadCaster.submit(msg.getPayload());
return CompletableFuture.completedFuture(null);
}

/**
* Same JMS session, different connector.
*
* @param msg Message to broadcast
* @return completed stage
*/
@Incoming("fromJmsSameSession")
public void sameSession(JmsMessage<String> msg) {
public CompletionStage<Void> sameSession(JmsMessage<String> msg) {
// Broadcast to all subscribers
broadCaster.submit(msg.getPayload());
return CompletableFuture.completedFuture(null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2022 Oracle and/or its affiliates.
# Copyright (c) 2022, 2023 Oracle and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,5 +38,5 @@ mp:
outgoing:
to-wls:
connector: helidon-weblogic-jms
# Same queue is used for simplifying test case
destination: ./TestJMSModule!TestQueue
# JNDI identifier for the same queue
jndi.destination: jms/TestQueue
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ Optional<? extends ConnectionFactory> lookupFactory(String jndi) {
}

Optional<? extends Destination> lookupDestination(String jndi) {
return Optional.ofNullable((Destination) lookup(jndi))
return Optional.ofNullable(lookup(jndi))
.map(o -> JakartaJms.resolve(o, Destination.class));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.helidon.messaging.MessagingException;
import io.helidon.messaging.NackHandler;
import io.helidon.messaging.Stoppable;
import io.helidon.messaging.connectors.jms.shim.JakartaJms;
import io.helidon.messaging.connectors.jms.shim.JakartaWrapper;

import jakarta.enterprise.context.ApplicationScoped;
Expand Down Expand Up @@ -296,13 +297,16 @@ public class JmsConnector implements IncomingConnectorFactory, OutgoingConnector
static final String SCHEDULER_THREAD_NAME_PREFIX = "jms-poll-";
static final String EXECUTOR_THREAD_NAME_PREFIX = "jms-";

private final Instance<ConnectionFactory> connectionFactories;
private final Instance<ConnectionFactory> jakartaConnectionFactories;

private final ScheduledExecutorService scheduler;
private final ExecutorService executor;
private final Map<String, SessionMetadata> sessionRegister = new HashMap<>();
private final Map<String, ConnectionFactory> connectionFactoryMap;

@Inject
private Instance<javax.jms.ConnectionFactory> javaxConnectionFactories;

/**
* Provides a {@link JmsConnectorBuilder} for creating
* a {@link io.helidon.messaging.connectors.jms.JmsConnector} instance.
Expand Down Expand Up @@ -334,12 +338,13 @@ public static JmsConfigBuilder configBuilder() {
/**
* Create new JmsConnector.
*
* @param connectionFactories connection factory beans
* @param jakartaConnectionFactories connection factory beans
* @param config root config for thread context
*/
@Inject
protected JmsConnector(io.helidon.config.Config config, Instance<ConnectionFactory> connectionFactories) {
this.connectionFactories = connectionFactories;
protected JmsConnector(io.helidon.config.Config config,
Instance<ConnectionFactory> jakartaConnectionFactories) {
this.jakartaConnectionFactories = jakartaConnectionFactories;
this.connectionFactoryMap = Map.of();
scheduler = ScheduledThreadPoolSupplier.builder()
.threadNamePrefix(SCHEDULER_THREAD_NAME_PREFIX)
Expand All @@ -363,7 +368,8 @@ protected JmsConnector(io.helidon.config.Config config, Instance<ConnectionFacto
protected JmsConnector(Map<String, ConnectionFactory> connectionFactoryMap,
ScheduledExecutorService scheduler,
ExecutorService executor) {
this.connectionFactories = null;
this.jakartaConnectionFactories = null;
this.javaxConnectionFactories = null;
this.connectionFactoryMap = connectionFactoryMap;
this.scheduler = scheduler;
this.executor = executor;
Expand Down Expand Up @@ -453,20 +459,20 @@ protected Optional<? extends ConnectionFactory> getFactory(ConnectionContext ctx
if (factoryName.isPresent()) {
// Check SE map and MP instance for named factories
return Optional.ofNullable(connectionFactoryMap.get(factoryName.get()))
.or(() ->
Optional.ofNullable(connectionFactories)
.flatMap(s -> s.select(NamedLiteral.of(factoryName.get()))
.stream()
.findFirst()
)
);
.or(() -> getConnectionFactoryBean(factoryName.get()));
}

// Check SE map and MP instance for any factories
return connectionFactoryMap.values().stream().findFirst()
.or(() -> Optional.ofNullable(connectionFactories)
.flatMap(s -> s.stream().findFirst())
);
.or(() -> getConnectionFactoryBean(factoryName.get()));
}

private <T> Optional<ConnectionFactory> getConnectionFactoryBean(String name){
NamedLiteral literal = NamedLiteral.of(name);
return jakartaConnectionFactories.select(literal)
.stream()
.findFirst()
.or(() -> javaxConnectionFactories.select(literal).stream().map(JakartaJms::create).findFirst());
}

@Override
Expand Down Expand Up @@ -711,7 +717,6 @@ protected Destination createDestination(Session session, ConnectionContext ctx)

if (ctx.isJndi()) {
Optional<? extends Destination> jndiDestination = ctx.lookupDestination();
// JNDI can be used for looking up ConnectorFactory only
if (jndiDestination.isPresent()) {
return jndiDestination.get();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Copyright (c) 2023 Oracle and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

pattern=weblogic.**;java.util.**;java.lang.**;java.io.**;java.rmi.**
10 changes: 10 additions & 0 deletions tests/integration/jms/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,15 @@
<artifactId>helidon-microprofile-tests-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.helidon.messaging.mock</groupId>
<artifactId>helidon-messaging-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2022 Oracle and/or its affiliates.
* Copyright (c) 2020, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,7 +28,6 @@
public class AbstractJmsTest {

static final String BROKER_URL = "vm://localhost?broker.persistent=false";
// static final String BROKER_URL = "tcp://localhost:61616";
static Session session;
static ConnectionFactory connectionFactory;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2022 Oracle and/or its affiliates.
* Copyright (c) 2020, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -98,7 +98,7 @@ public static class ChannelAck extends AbstractSampleBean {

@Incoming("test-channel-ack-1")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public CompletionStage<String> channelAck(Message<String> msg) {
public CompletionStage<Void> channelAck(Message<String> msg) {
LOGGER.fine(() -> String.format("Received %s", msg.getPayload()));
consumed().add(msg.getPayload());
if (msg.getPayload().startsWith("NO_ACK")) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021 Oracle and/or its affiliates.
* Copyright (c) 2020, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,7 +32,6 @@
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.inject.se.SeContainer;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
Expand All @@ -54,16 +53,14 @@
@AddConfig(key = "mp.messaging.connector.helidon-jms.jndi.env-properties.java.naming.factory.initial",
value = "org.apache.activemq.jndi.ActiveMQInitialContextFactory"),

@AddConfig(key = "mp.messaging.connector.helidon-jms.period-executions", value = "5"),

@AddConfig(key = "mp.messaging.incoming.test-channel-ack-1.connector", value = JmsConnector.CONNECTOR_NAME),
@AddConfig(key = "mp.messaging.incoming.test-channel-ack-1.acknowledge-mode", value = "CLIENT_ACKNOWLEDGE"),
@AddConfig(key = "mp.messaging.incoming.test-channel-ack-1.type", value = "queue"),
@AddConfig(key = "mp.messaging.incoming.test-channel-ack-1.destination", value = AckMpTest.TEST_QUEUE_ACK),
})
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@Disabled("3.0.0-JAKARTA")
//java.lang.ClassCastException: class org.apache.activemq.ActiveMQConnectionFactory cannot be cast to class jakarta.jms
// .ConnectionFactory (org.apache.activemq.ActiveMQConnectionFactory and jakarta.jms.ConnectionFactory are in unnamed module of
// loader 'app')
public class AckMpTest extends AbstractMPTest {

static final String TEST_QUEUE_ACK = "queue-ack";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.messaging.connectors.jms;

import java.time.Duration;
import java.util.stream.IntStream;

import io.helidon.common.reactive.Multi;
import io.helidon.messaging.connectors.jms.shim.JakartaJms;
import io.helidon.messaging.connectors.mock.MockConnector;
import io.helidon.messaging.connectors.mock.TestConnector;
import io.helidon.microprofile.config.ConfigCdiExtension;
import io.helidon.microprofile.messaging.MessagingCdiExtension;
import io.helidon.microprofile.tests.junit5.AddBean;
import io.helidon.microprofile.tests.junit5.AddConfig;
import io.helidon.microprofile.tests.junit5.AddExtension;
import io.helidon.microprofile.tests.junit5.DisableDiscovery;
import io.helidon.microprofile.tests.junit5.HelidonTest;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.junit.jupiter.api.Test;

import static io.helidon.messaging.connectors.jms.JmsConnector.CONNECTOR_NAME;
import static org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory.CONNECTOR_PREFIX;
import static org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory.INCOMING_PREFIX;
import static org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory.OUTGOING_PREFIX;

@HelidonTest
@DisableDiscovery
@AddBean(JmsConnector.class)
@AddBean(MockConnector.class)
@AddExtension(ConfigCdiExtension.class)
@AddExtension(MessagingCdiExtension.class)
@AddConfig(key = CONNECTOR_PREFIX + CONNECTOR_NAME + ".period-executions", value = "5")
@AddConfig(key = CONNECTOR_PREFIX + CONNECTOR_NAME + ".destination", value = "TestQueue1")

@AddConfig(key = OUTGOING_PREFIX + "to-jms.connector", value = CONNECTOR_NAME)
@AddConfig(key = OUTGOING_PREFIX + "to-jms.named-factory", value = "activemq-cf-jakarta")
@AddConfig(key = INCOMING_PREFIX + "from-jms.connector", value = CONNECTOR_NAME)
@AddConfig(key = INCOMING_PREFIX + "from-jms.named-factory", value = "activemq-cf-jakarta")
@AddConfig(key = OUTGOING_PREFIX + "to-mock.connector", value = MockConnector.CONNECTOR_NAME)
public class JmsInjectedFactoryTest {

static final Duration TIME_OUT = Duration.ofSeconds(15);
static final Integer[] TEST_DATA = IntStream.range(0, 10).boxed().toArray(Integer[]::new);

@Inject
@TestConnector
private MockConnector mockConnector;

@Produces
@ApplicationScoped
@Named("activemq-cf-jakarta")
public jakarta.jms.ConnectionFactory connectionFactory() {
return JakartaJms.create(new ActiveMQConnectionFactory(AbstractJmsTest.BROKER_URL));
}

@Outgoing("to-jms")
public Multi<String> produceData() {
return Multi.just(TEST_DATA)
.map(String::valueOf);
}

@Incoming("from-jms")
@Outgoing("to-mock")
public ProcessorBuilder<String, Integer> resendToMock() {
return ReactiveStreams.<String>builder()
.map(Integer::parseInt);
}

@Test
void jmsInOutTest() {
mockConnector.outgoing("to-mock", Integer.TYPE)
.awaitPayloads(TIME_OUT, TEST_DATA);
}
}
Loading

0 comments on commit 9295062

Please sign in to comment.