-
Notifications
You must be signed in to change notification settings - Fork 1.1k
GH-9368: Support for adding MqttMessageDrivenChannelAdapter at runtime #9382
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@Ricore72 Please sign the Contributor License Agreement! Click here to manually synchronize the status of this Pull Request. See the FAQ for frequently asked questions. |
@Ricore72 Thank you for signing the Contributor License Agreement! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, add your name to the @author
list of all the affected class.
Would be also great to add a whats-new.adoc
for this new feature and explain in in the target mqtt.adoc
where we describe Shared MQTT Client Support
.
Thanks
...-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/ClientManager.java
Show resolved
Hide resolved
@Override | ||
public boolean isConnection() { | ||
if (getClient() != null) { | ||
return getClient().isConnected(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to extract into a local variable.
I mean IMqttAsyncClient client = getClient()
.
Also wrap the logic into a this.lock.lock();
.
See start()
for example.
@Override | ||
public boolean isConnection() { | ||
if (getClient() != null) { | ||
return getClient().isConnected(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DITTO
@@ -203,6 +203,9 @@ protected void onInit() { | |||
super.onInit(); | |||
if (this.clientManager != null) { | |||
this.clientManager.addCallback(this); | |||
if (this.clientManager.isConnection()) { | |||
connectComplete(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think has to be false
since we indeed not reconnecting.
|
||
@Bean | ||
public IntegrationFlow mqttOutFlow(Mqttv3ClientManager mqttv3ClientManager) { | ||
return f -> f.handle(new MqttPahoMessageHandler(mqttv3ClientManager)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This configuration does not reflect the purpose of the issue.
I believe these new tests are going to work even without the change in the client managers and channel adapters.
Please, consider to provide the test configuration according to your experience: register channel adapters at runtime via IntegrationFlowContext
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, sorry I modified it incorrectly. Mqttv3ClientManager
was meaned to be started in before returning. Didnt noticed olny one test is failing on main
I added test with IntegrationFlowContext
in the modifications and fixed the this configuration.
4 tests failing on main
now
* @return the managed clients isConnected. | ||
* @since 6.4 | ||
*/ | ||
boolean isConnection(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the name of the method is slightly wrong.
I believe it has to be isConnected()
.
|
||
[[x6.4-mqtt-changes]] | ||
=== mqtt Changes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MQTT Support Changes
, please.
|
||
[source,java] | ||
---- | ||
private void addAddRuntimeAdapter(IntegrationFlowContext flowContext,Mqttv5ClientManager clientManager, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whitespace is supposed to be after each comma.
Also double check, please, that there are no tab indents in all those lines of code.
...on-mqtt/src/test/java/org/springframework/integration/mqtt/ClientManagerBackToBackTests.java
Show resolved
Hide resolved
var clientManager = ctx.getBean(ClientManager.class); | ||
var output = new QueueChannel(); | ||
Class<?>[] parameterTypes = {ClientManager.class, String[].class}; | ||
Constructor<?> declaredConstructor = adapter.getConstructor(parameterTypes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a reason in this reflection.
You just can have a factory method in the Mqttv5ConfigRuntime
which would return us a respective MessageProducerSupport
instance.
The @Configuration
class is registered as a bean as well.
So, you might just need to extract some common interface for both of your Mqttv5ConfigRuntime
and Mqttv3ConfigRuntime
to have that factory method implemented respectively.
@Bean | ||
public Mqttv5ClientManager mqttv5ClientManager() { | ||
Mqttv5ClientManager manager = new Mqttv5ClientManager(MosquittoContainerTest.mqttUrl(), "client-manager-client-id-v5"); | ||
manager.start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You still didn't address this.
One more time: it is an anti-pattern to have component started too early.
Therefore the whole purpose of these ...WithStartedManager
configurations is wrong.
I suggestion to remove them altogether with their dedicated tests.
The regular (recommended) application context start is covered by other tests.
What we need for this change is really that based on the IntegrationFlowContext
and beans added at runtime.
Therefore I'd expect only two new tests in your PR.
Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am sorry.
Removed the tests and Messed up the history. Thank you for your feedback and time. I think it must be hard to cooperate with someone like me. Hope everything is ok now.
1) exposing isConnection() in ClientManager 2) implementing isConnection() in Mqttv3ClientManager and Mqttv5ClientManager 3) Modifining OnInit() in AbstractMqttMessageDrivenChannelAdapter
Adding tests for addition of MessageDrivenAdapters at runtime. 1) Adding config classes Mqttv3ConfigWithStartedManager, Mqttv5ConfigWithStartedManager. 2) Adding test for config classes above.
* `ClientManager` - @SInCE 6.4 comment to `isConnecttion()` * `Mqttv3ClientManager`, `Mqttv5ClientManager` - adding local variable and lock logic * `AbstractMqttMessageDrivenChannelAdapter` - changed `connectCompete()` to false
* `testV3ClientManagerRuntime` and `testV5ClientManagerRuntime` adding MessageDrivenAdaptes at runtime using `IntegrationFlowContext`
…v5PahoMessageDrivenChannelAdapter
* `ClientManager`, `Mqttv3ClientManager`, `Mqttv5ClientManager`, `AbstractMqttMessageDrivenChannelAdapter` - renamig `isConnection()` to `isConnected()` * fixing docs `mqtt.adoc` and `whats-new.adoc` * `ClientManagerBackToBackTests` adding factory interface `MessageDrivenChannelAdapterFactory` to create adapters
thank you for the contribution; looking forward for more! |
Fixes: #9368
Currently, it is not possible to add
MqttMessageDrivenChannelAdapter
afterMqttClient
is connected, usingClientManager
in constructor.isConnection()
inClientManager
isConnection()
inMqttv3ClientManager
andMqttv5ClientManager
OnInit()
inAbstractMqttMessageDrivenChannelAdapter
to setreadyToSubscribeOnStart
= trueClientManagerBackToBackTests
to check functionality (testV3ClientManagerStarted
,testV5ClientManagerStarted
)