Skip to content

Commit

Permalink
6391 JMS intermittent test fix (helidon-io#6393)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielkec authored Mar 9, 2023
1 parent 52d9227 commit 829c0c0
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSException;
import jakarta.jms.MessageConsumer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;

import static io.helidon.messaging.connectors.jms.AcknowledgeMode.AUTO_ACKNOWLEDGE;

public class AbstractJmsTest {

static final String BROKER_URL = "vm://localhost?broker.persistent=false";
Expand All @@ -45,4 +50,19 @@ static void tearDown() throws Exception {
session.close();
}

static void clearQueue(String queueName){
var cf = JakartaJms.create(new ActiveMQConnectionFactory(AbstractJmsTest.BROKER_URL));
try (Connection conn = cf.createConnection();
var s = conn.createSession(false, AUTO_ACKNOWLEDGE.getAckMode())) {
conn.start();
Queue queue = s.createQueue(queueName);
MessageConsumer cons = s.createConsumer(queue);
jakarta.jms.Message m;
do {
m = cons.receive(100L);
} while (m != null);
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021 Oracle and/or its affiliates.
* Copyright (c) 2020, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -85,12 +85,13 @@ Stream<Message> consumeAllCurrent(String topic) {
Message m;
List<Message> result = new ArrayList<>();
for (; ; ) {
m = consumer.receive(50L);
m = consumer.receive(500L);
if (m == null) {
break;
}
result.add(m);
}
consumer.close();
return result.stream();
} catch (JMSException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,25 +93,6 @@ protected void countDown(String method) {
}
}

@ApplicationScoped
public static class ChannelAck extends AbstractSampleBean {

@Incoming("test-channel-ack-1")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public CompletionStage<Void> channelAck(Message<String> msg) {
LOGGER.fine(() -> String.format("Received %s", msg.getPayload()));
consumed().add(msg.getPayload());
if (msg.getPayload().startsWith("NO_ACK")) {
LOGGER.fine(() -> String.format("NOT Acked %s", msg.getPayload()));
} else {
LOGGER.fine(() -> String.format("Acked %s", msg.getPayload()));
msg.ack();
}
countDown("channel1()");
return CompletableFuture.completedFuture(null);
}
}

@ApplicationScoped
public static class Channel1 extends AbstractSampleBean {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@

package io.helidon.messaging.connectors.jms;

import java.lang.annotation.Annotation;
import java.time.Duration;
import java.util.List;

import io.helidon.messaging.connectors.mock.MockConnector;
import io.helidon.messaging.connectors.mock.TestConnector;
import io.helidon.microprofile.config.ConfigCdiExtension;
import io.helidon.microprofile.messaging.MessagingCdiExtension;
import io.helidon.microprofile.tests.junit5.AddBean;
Expand All @@ -29,19 +33,24 @@
import io.helidon.microprofile.tests.junit5.DisableDiscovery;
import io.helidon.microprofile.tests.junit5.HelidonTest;

import jakarta.annotation.PostConstruct;
import jakarta.enterprise.inject.se.SeContainer;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;

import static java.lang.System.Logger.Level.DEBUG;

@HelidonTest(resetPerTest = true)
@DisableDiscovery
@AddBeans({
@AddBean(JmsConnector.class),
@AddBean(AbstractSampleBean.ChannelAck.class),
@AddBean(MockConnector.class),
})
@AddExtensions({
@AddExtension(ConfigCdiExtension.class),
Expand All @@ -59,42 +68,56 @@
@AddConfig(key = "mp.messaging.incoming.test-channel-ack-1.acknowledge-mode", value = "CLIENT_ACKNOWLEDGE"),
@AddConfig(key = "mp.messaging.incoming.test-channel-ack-1.type", value = "queue"),
@AddConfig(key = "mp.messaging.incoming.test-channel-ack-1.destination", value = AckMpTest.TEST_QUEUE_ACK),

@AddConfig(key = "mp.messaging.outgoing.mock-conn-channel.connector", value = MockConnector.CONNECTOR_NAME),
})
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class AckMpTest extends AbstractMPTest {

static final String TEST_QUEUE_ACK = "queue-ack";

@PostConstruct
void cleanupBefore() {
//cleanup not acked messages
consumeAllCurrent(TEST_QUEUE_ACK)
.map(JmsMessage::of)
.forEach(Message::ack);
private static final System.Logger LOGGER = System.getLogger(AckMpTest.class.getName());
private static final Annotation TEST_CONNECTOR_ANNOTATION = MockConnector.class.getAnnotation(TestConnector.class);

@Incoming("test-channel-ack-1")
@Outgoing("mock-conn-channel")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public Message<String> channelAck(Message<String> msg) {
LOGGER.log(DEBUG, () -> String.format("Received %s", msg.getPayload()));
if (msg.getPayload().startsWith("NO_ACK")) {
LOGGER.log(DEBUG, () -> String.format("NOT Acked %s", msg.getPayload()));
} else {
LOGGER.log(DEBUG, () -> String.format("Acked %s", msg.getPayload()));
msg.ack();
}
return msg;
}

@Test
@Order(1)
void resendAckTestPart1(SeContainer cdi) {
MockConnector mockConnector = cdi.select(MockConnector.class, TEST_CONNECTOR_ANNOTATION).get();
//Messages starting with NO_ACK is not acked by ChannelAck bean
List<String> testData = List.of("0", "1", "2", "NO_ACK-1", "NO_ACK-2", "NO_ACK-3");
AbstractSampleBean bean = cdi.select(AbstractSampleBean.ChannelAck.class).get();
produceAndCheck(bean, testData, TEST_QUEUE_ACK, testData);
bean.restart();
produce(TEST_QUEUE_ACK, testData, m -> {});
mockConnector.outgoing("mock-conn-channel", String.class)
.awaitPayloads(Duration.ofSeconds(5), testData.toArray(String[]::new));
}

@Test
@Order(2)
void resendAckTestPart2(SeContainer cdi) {
try {
AbstractSampleBean bean = cdi.select(AbstractSampleBean.ChannelAck.class).get();
//Send nothing just check if not acked messages are redelivered
produceAndCheck(bean, List.of(), TEST_QUEUE_ACK, List.of("NO_ACK-1", "NO_ACK-2", "NO_ACK-3"));
} finally {
//cleanup not acked messages
consumeAllCurrent(TEST_QUEUE_ACK)
.map(JmsMessage::of)
.forEach(Message::ack);
}
MockConnector mockConnector = cdi.select(MockConnector.class, TEST_CONNECTOR_ANNOTATION).get();

//Check if not acked messages are redelivered
mockConnector.outgoing("mock-conn-channel", String.class)
.requestMax()
.awaitCount(Duration.ofSeconds(5), 1)
.awaitPayloads(Duration.ofSeconds(5), "NO_ACK-1", "NO_ACK-2", "NO_ACK-3");
}

@AfterAll
static void afterAll() {
AbstractJmsTest.clearQueue(TEST_QUEUE_ACK);
}
}

0 comments on commit 829c0c0

Please sign in to comment.