Skip to content

GH-9368: Support for adding MqttMessageDrivenChannelAdapter at runtime #9382

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 the original author or authors.
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,7 @@
*
* @author Artem Vozhdayenko
* @author Artem Bilan
* @author Jiri Soucek
*
* @since 6.0
*/
Expand Down Expand Up @@ -68,6 +69,13 @@ public interface ClientManager<T, C> extends SmartLifecycle, MqttComponent<C> {
*/
boolean removeCallback(ConnectCallback connectCallback);

/**
* Return the managed clients isConnected.
* @return the managed clients isConnected.
* @since 6.4
*/
boolean isConnected();

/**
* A contract for a custom callback on {@code connectComplete} event from the client.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* @author Artem Vozhdayenko
* @author Artem Bilan
* @author Christian Tzolov
* @author Jiri Soucek
*
* @since 6.0
*/
Expand Down Expand Up @@ -198,4 +199,19 @@ public void deliveryComplete(IMqttDeliveryToken token) {
// nor this manager concern
}

@Override
public boolean isConnected() {
this.lock.lock();
try {
IMqttAsyncClient client = getClient();
if (client != null) {
return client.isConnected();
}
return false;
}
finally {
this.lock.unlock();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
* @author Artem Vozhdayenko
* @author Artem Bilan
* @author Christian Tzolov
* @author Jiri Soucek
*
* @since 6.0
*/
Expand Down Expand Up @@ -206,4 +207,18 @@ public void mqttErrorOccurred(MqttException exception) {
logger.error("MQTT error occurred", exception);
}

@Override
public boolean isConnected() {
this.lock.lock();
try {
IMqttAsyncClient client = getClient();
if (client != null) {
return client.isConnected();
}
return false;
}
finally {
this.lock.unlock();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -49,6 +49,7 @@
* @author Trung Pham
* @author Mikhail Polivakha
* @author Artem Vozhdayenko
* @author Jiri Soucek
*
* @since 4.0
*
Expand Down Expand Up @@ -203,6 +204,9 @@ protected void onInit() {
super.onInit();
if (this.clientManager != null) {
this.clientManager.addCallback(this);
if (this.clientManager.isConnected()) {
connectComplete(false);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@
import org.eclipse.paho.client.mqttv3.MqttException;
import org.junit.jupiter.api.Test;

import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.context.IntegrationFlowContext;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mqtt.core.Mqttv3ClientManager;
import org.springframework.integration.mqtt.core.Mqttv5ClientManager;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
Expand Down Expand Up @@ -70,12 +74,24 @@ void testV3ClientManagerReconnect() throws Exception {
Mqttv3ConfigWithDisconnect.subscribedLatch);
}

@Test
void testV3ClientManagerRuntime() throws Exception {
testSubscribeAndPublishRuntime(Mqttv3ConfigRuntime.class, Mqttv3ConfigRuntime.TOPIC_NAME,
Mqttv3ConfigRuntime.subscribedLatch);
}

@Test
void testV5ClientManagerReconnect() throws Exception {
testSubscribeAndPublish(Mqttv5ConfigWithDisconnect.class, Mqttv5ConfigWithDisconnect.TOPIC_NAME,
Mqttv5ConfigWithDisconnect.subscribedLatch);
}

@Test
void testV5ClientManagerRuntime() throws Exception {
testSubscribeAndPublishRuntime(Mqttv5ConfigRuntime.class, Mqttv5ConfigRuntime.TOPIC_NAME,
Mqttv5ConfigRuntime.subscribedLatch);
}

private void testSubscribeAndPublish(Class<?> configClass, String topicName, CountDownLatch subscribedLatch)
throws Exception {

Expand All @@ -102,6 +118,39 @@ private void testSubscribeAndPublish(Class<?> configClass, String topicName, Cou
}
}

private void testSubscribeAndPublishRuntime(Class<?> configClass, String topicName, CountDownLatch subscribedLatch)
throws Exception {

try (var ctx = new AnnotationConfigApplicationContext(configClass)) {
// given
var input = ctx.getBean("mqttOutFlow.input", MessageChannel.class);
var flowContext = ctx.getBean(IntegrationFlowContext.class);
var factory = ctx.getBean(MessageDrivenChannelAdapterFactory.class);
var output = new QueueChannel();

flowContext.registration(IntegrationFlow
.from(factory.createMessageDrivenAdapter(ctx))
.channel(output)
.get()).register();
String testPayload = "foo";
assertThat(subscribedLatch.await(20, TimeUnit.SECONDS)).isTrue();

// when
input.send(MessageBuilder.withPayload(testPayload).setHeader(MqttHeaders.TOPIC, topicName).build());
Message<?> receive = output.receive(20_000);

// then
assertThat(receive).isNotNull();
Object payload = receive.getPayload();
if (payload instanceof String sp) {
assertThat(sp).isEqualTo(testPayload);
}
else {
assertThat(payload).isEqualTo(testPayload.getBytes(StandardCharsets.UTF_8));
}
}
}

@Configuration
@EnableIntegration
public static class Mqttv3Config {
Expand Down Expand Up @@ -177,6 +226,39 @@ public IntegrationFlow mqttInFlow(Mqttv3ClientManager mqttv3ClientManager) {

}

@Configuration
@EnableIntegration
public static class Mqttv3ConfigRuntime implements MessageDrivenChannelAdapterFactory {

static final String TOPIC_NAME = "test-topic-v3";

static final CountDownLatch subscribedLatch = new CountDownLatch(1);

@EventListener
public void onSubscribed(MqttSubscribedEvent e) {
subscribedLatch.countDown();
}

@Bean
public Mqttv3ClientManager mqttv3ClientManager() {
MqttConnectOptions connectionOptions = new MqttConnectOptions();
connectionOptions.setServerURIs(new String[] {MosquittoContainerTest.mqttUrl()});
connectionOptions.setAutomaticReconnect(true);
return new Mqttv3ClientManager(connectionOptions, "client-manager-client-id-v3");
}

@Bean
public IntegrationFlow mqttOutFlow(Mqttv3ClientManager mqttv3ClientManager) {
return f -> f.handle(new MqttPahoMessageHandler(mqttv3ClientManager));
}

@Override
public MessageProducerSupport createMessageDrivenAdapter(ApplicationContext ctx) {
var clientManager = ctx.getBean(Mqttv3ClientManager.class);
return new MqttPahoMessageDrivenChannelAdapter(clientManager, TOPIC_NAME);
}
}

@Configuration
@EnableIntegration
public static class Mqttv5Config {
Expand Down Expand Up @@ -247,6 +329,41 @@ public IntegrationFlow mqttInFlow(Mqttv5ClientManager mqttv5ClientManager) {

}

@Configuration
@EnableIntegration
public static class Mqttv5ConfigRuntime implements MessageDrivenChannelAdapterFactory {

static final String TOPIC_NAME = "test-topic-v5";

static final CountDownLatch subscribedLatch = new CountDownLatch(1);

@EventListener
public void onSubscribed(MqttSubscribedEvent e) {
subscribedLatch.countDown();
}

@Bean
public Mqttv5ClientManager mqttv5ClientManager() {
return new Mqttv5ClientManager(MosquittoContainerTest.mqttUrl(), "client-manager-client-id-v5");
}

@Bean
@ServiceActivator(inputChannel = "mqttOutFlow.input")
public Mqttv5PahoMessageHandler mqttv5PahoMessageHandler(Mqttv5ClientManager mqttv5ClientManager) {
return new Mqttv5PahoMessageHandler(mqttv5ClientManager);
}

@Override
public MessageProducerSupport createMessageDrivenAdapter(ApplicationContext ctx) {
var clientManager = ctx.getBean(Mqttv5ClientManager.class);
return new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, TOPIC_NAME);
}
}

interface MessageDrivenChannelAdapterFactory {
MessageProducerSupport createMessageDrivenAdapter(ApplicationContext ctx);
}

record ClientV3Disconnector(Mqttv3ClientManager clientManager) {

@EventListener(MqttSubscribedEvent.class)
Expand Down
22 changes: 19 additions & 3 deletions src/reference/antora/modules/ROOT/pages/mqtt.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -382,9 +382,9 @@ public class MqttJavaApplication {
.run(args);
}

@Bean
public IntegrationFlow mqttOutboundFlow() {
return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
@Bean
public IntegrationFlow mqttOutboundFlow() {
return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
}

}
Expand Down Expand Up @@ -548,3 +548,19 @@ public IntegrationFlow mqttOutFlow(
return f -> f.handle(new Mqttv5PahoMessageHandler(clientManager));
}
----

NOTE: Starting with version 6.4, multiple instances of `MqttPahoMessageDrivenChannelAdapter` and `Mqttv5PahoMessageDrivenChannelAdapter` can now be added at runtime using corresponding `ClientManager` through `IntegrationFlowContext`

[source,java]
----
private void addAddRuntimeAdapter(IntegrationFlowContext flowContext, Mqttv5ClientManager clientManager,
String topic, MessageChannel channel) {
flowContext
.registration(
IntegrationFlow
.from(new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, topic))
.channel(channel)
.get())
.register();
}
----
10 changes: 9 additions & 1 deletion src/reference/antora/modules/ROOT/pages/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,16 @@ See xref:redis.adoc[Redis Support] for more information.
The `ControlBusFactoryBean` (and respective `<int-groovy:control-bus>` XML tag) has been deprecated (for removal) in favor of new introduced `ControlBusFactoryBean` based on a new model implemented in the `ControlBusCommandRegistry`.
See xref:control-bus.adoc[Control Bus] for more information.


[[x6.4-sftp-changes]]
=== SFTP Support Changes

The `DefaultSftpSessionFactory` now exposes a `Consumer<SshClient>` configurer property to further customize an internal `SshClient`.
See xref:sftp/session-factory.adoc[SFTP Session Factory] for more information.
See xref:sftp/session-factory.adoc[SFTP Session Factory] for more information.

[[x6.4-mqtt-support-changes]]
=== MQTT Support Changes

Multiple instances of `MqttPahoMessageDrivenChannelAdapter` and `Mqttv5PahoMessageDrivenChannelAdapter` can now be added at runtime using corresponding `ClientManager` through `IntegrationFlowContext`
See xref:mqtt.adoc[MQTT Support] for more information.