Skip to content

GH-9368: Support for adding MqttMessageDrivenChannelAdapter at runtime #9382

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Aug 19, 2024

Conversation

Ricore72
Copy link
Contributor

Fixes: #9368

Currently, it is not possible to add MqttMessageDrivenChannelAdapter after MqttClient is connected, using ClientManager in constructor.

  • Exposed isConnection() in ClientManager
  • Implemented isConnection() in Mqttv3ClientManager and Mqttv5ClientManager
  • Modified OnInit() in AbstractMqttMessageDrivenChannelAdapter to set readyToSubscribeOnStart = true
  • Added tests to ClientManagerBackToBackTests to check functionality (testV3ClientManagerStarted, testV5ClientManagerStarted)

@pivotal-cla
Copy link

@Ricore72 Please sign the Contributor License Agreement!

Click here to manually synchronize the status of this Pull Request.

See the FAQ for frequently asked questions.

@pivotal-cla
Copy link

@Ricore72 Thank you for signing the Contributor License Agreement!

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, add your name to the @author list of all the affected class.

Would be also great to add a whats-new.adoc for this new feature and explain in in the target mqtt.adoc where we describe Shared MQTT Client Support.

Thanks

@Override
public boolean isConnection() {
if (getClient() != null) {
return getClient().isConnected();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to extract into a local variable.
I mean IMqttAsyncClient client = getClient().
Also wrap the logic into a this.lock.lock();.
See start() for example.

@Override
public boolean isConnection() {
if (getClient() != null) {
return getClient().isConnected();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DITTO

@@ -203,6 +203,9 @@ protected void onInit() {
super.onInit();
if (this.clientManager != null) {
this.clientManager.addCallback(this);
if (this.clientManager.isConnection()) {
connectComplete(true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think has to be false since we indeed not reconnecting.


@Bean
public IntegrationFlow mqttOutFlow(Mqttv3ClientManager mqttv3ClientManager) {
return f -> f.handle(new MqttPahoMessageHandler(mqttv3ClientManager));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This configuration does not reflect the purpose of the issue.
I believe these new tests are going to work even without the change in the client managers and channel adapters.
Please, consider to provide the test configuration according to your experience: register channel adapters at runtime via IntegrationFlowContext.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, sorry I modified it incorrectly. Mqttv3ClientManager was meaned to be started in before returning. Didnt noticed olny one test is failing on main

I added test with IntegrationFlowContext in the modifications and fixed the this configuration.

4 tests failing on main now

* @return the managed clients isConnected.
* @since 6.4
*/
boolean isConnection();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the name of the method is slightly wrong.
I believe it has to be isConnected().


[[x6.4-mqtt-changes]]
=== mqtt Changes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MQTT Support Changes, please.


[source,java]
----
private void addAddRuntimeAdapter(IntegrationFlowContext flowContext,Mqttv5ClientManager clientManager,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whitespace is supposed to be after each comma.
Also double check, please, that there are no tab indents in all those lines of code.

var clientManager = ctx.getBean(ClientManager.class);
var output = new QueueChannel();
Class<?>[] parameterTypes = {ClientManager.class, String[].class};
Constructor<?> declaredConstructor = adapter.getConstructor(parameterTypes);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a reason in this reflection.
You just can have a factory method in the Mqttv5ConfigRuntime which would return us a respective MessageProducerSupport instance.
The @Configuration class is registered as a bean as well.
So, you might just need to extract some common interface for both of your Mqttv5ConfigRuntime and Mqttv3ConfigRuntime to have that factory method implemented respectively.

@Ricore72 Ricore72 requested a review from artembilan August 16, 2024 14:20
@Bean
public Mqttv5ClientManager mqttv5ClientManager() {
Mqttv5ClientManager manager = new Mqttv5ClientManager(MosquittoContainerTest.mqttUrl(), "client-manager-client-id-v5");
manager.start();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You still didn't address this.
One more time: it is an anti-pattern to have component started too early.
Therefore the whole purpose of these ...WithStartedManager configurations is wrong.
I suggestion to remove them altogether with their dedicated tests.
The regular (recommended) application context start is covered by other tests.
What we need for this change is really that based on the IntegrationFlowContext and beans added at runtime.
Therefore I'd expect only two new tests in your PR.

Thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry.
Removed the tests and Messed up the history. Thank you for your feedback and time. I think it must be hard to cooperate with someone like me. Hope everything is ok now.

1) exposing isConnection() in ClientManager
2) implementing isConnection() in Mqttv3ClientManager and Mqttv5ClientManager
3) Modifining OnInit() in AbstractMqttMessageDrivenChannelAdapter
Adding tests for addition of MessageDrivenAdapters at runtime.

1) Adding config classes Mqttv3ConfigWithStartedManager, Mqttv5ConfigWithStartedManager.
2) Adding test for config classes above.
* `ClientManager` - @SInCE 6.4 comment to `isConnecttion()`
* `Mqttv3ClientManager`, `Mqttv5ClientManager` - adding local variable and lock logic
* `AbstractMqttMessageDrivenChannelAdapter` - changed `connectCompete()` to false
* `testV3ClientManagerRuntime` and `testV5ClientManagerRuntime` adding MessageDrivenAdaptes at runtime using `IntegrationFlowContext`
* `ClientManager`, `Mqttv3ClientManager`, `Mqttv5ClientManager`, `AbstractMqttMessageDrivenChannelAdapter` - renamig `isConnection()` to `isConnected()`
* fixing docs `mqtt.adoc` and `whats-new.adoc`
* `ClientManagerBackToBackTests` adding factory interface `MessageDrivenChannelAdapterFactory` to create adapters
@Ricore72 Ricore72 requested a review from artembilan August 16, 2024 22:35
@artembilan artembilan merged commit d1146e5 into spring-projects:main Aug 19, 2024
3 checks passed
@artembilan
Copy link
Member

@Ricore72 ,

thank you for the contribution; looking forward for more!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Adding Mqttv5PahoMessageDrivenChannelAdapter to running Mqttv5ClientManager
3 participants