Skip to content

GH-3685: Share MQTT connection across components #3857

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from

Conversation

oxcafedead
Copy link
Contributor

Fixes #3685

Introduce some initial design.
Add a new interface MqttClientManager which will manage clients and
connections.
Use this manager in v3 topic adapter and message handler.

@oxcafedead
Copy link
Contributor Author

Hi @artembilan
Can you please review - this is not a complete implementation but rather some approach. I have replaced v3 classes and adjusted tests to prove it's working (and added a couple of new tests as well).
I'd like to hear your feedback. Is this something which you think may be a solution?

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You know doesn't look so bad.
But still some concerns to clean up.
I wish we have started from a central management for the client from day first...

@oxcafedead
Copy link
Contributor Author

oxcafedead commented Jul 27, 2022

I've added some changes to use client manager in adapters and handlers, also added support for both versions of clients.
TODO: unit tests covering shared client usage, possible bugfixes, XSD schemas and parsers update, documentation. (also squash commits)

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: unit tests covering shared client usage, possible bugfixes, XSD schemas and parsers update, documentation. (also squash commits)

Unit tests are great to have.
The rest of work can be done in the separate PRs.
No, please, not squashing: we do that on merge.
It is better to preserve commits history in the PR as is: must easier to review.

@oxcafedead oxcafedead marked this pull request as ready for review July 28, 2022 14:02
Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oxcafedead ,

your contribution is great and very valuable.
No need to worry about me: this is my project and it is my responsibility to review your changes as long as it is needed.

That's my apologies that this task takes more time, than it looked originally: you see I also change my mind from time to time. Plus indeed I feel like I learn something from you as well 😄

So, big thanks to you again and keep her steady!

Let me know when you are going to give up, so I'll take from there my self: we still have to implement this feature since it looks like very reasonable from target use-case 😉

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, consider to rebase your branch to the latest main: we finally switched to the CompletableFuture whenever ListenableFuture is deprecated.
This way your local build won't fail for some unexpected reasons 😄

@oxcafedead
Copy link
Contributor Author

@artembilan - I will push latest changes with rebase in an hour.

Fixes spring-projects#3685

Introduce some initial design.
Add a new interface `ClientManager` which will manage clients and
connections.
Use this manager in v3 topic adapter and message handler.
Fixes spring-projects#3685

Add a new interface `ClientManager` which will manage clients and
connections.
Add different implementations for v3 and v5 MQTT clients.
Use this manager in v3/v5 topic adapters and message handlers.
Fixes spring-projects#3685

Add a couple of unit/integration tests to cover client manager usage.
Several small code improvements after the code review:
* Improve client manager usage via providing several mutual exclusive
constructors, whether the users provides `url` or `connectionOptions`
or `clientFactory` for v3.
* Move the logger to `AbstractMqttClientManager`
* Do not inject TaskScheduler in constructor for v3 client manager
but use lazy init via `BeanFactory` and `IntegrationContextUtils`
* Other smaller code readability improvements
Fixes spring-projects#3685

Add new tests with reconnect cases.
Other code improvements after the code review:
* Adjust javadocs according to standards
* Remove `setClientManager` and use exclusive ctors
* Make automatic reconnects using the v3 client instead of manually
using task scheduler
Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't review v3 variant because I was not involved in its implementation back in days, but I believe with our discussion to realign both protocols in their ClientManagers my v5 review will apply there as well.

MessageListener[] listeners = IntStream.range(0, topics.length)
.mapToObj(t -> new MessageListener())
.toArray(MessageListener[]::new);
getClientManager().getClient().subscribe(subscriptions, null, null, listeners, null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what is wrong with this method:

public IMqttToken subscribe(MqttSubscription[] subscriptions, IMqttMessageListener messageListener) throws MqttException;

and remember we talked about using a MessageListener even of an internally controlled client?

That's why I'm asking about that this.mqttClient = getClientManager().getClient()

Copy link
Contributor Author

@oxcafedead oxcafedead Jul 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially I have used this method you mentioned - but it gives a runtime exception. Yeah, I found it when ran my back-to-back integration tests with mosquitto. And it seems like the bug in the client lib.
org/eclipse/paho/mqttv5/client/MqttAsyncClient.java:1220 - passes an empty new MqttProperties()
and then org/eclipse/paho/mqttv5/client/MqttAsyncClient.java:1276 throws no such element

and remember we talked about using a MessageListener even of an internally controlled client?

Probably some other misunderstanding... sorry, I don't get it. MessageListener is the internal class of adapter which is a bridge: it implements the interface IMqttMessageListener of the library and reuses the existing logic of messageArrived method of the adapter class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The messageArrived() is not this class method. It is dictated by that MqttCallback we implement on this class.
With my finding it looks like this messageArrived() and IMqttMessageListener are exactly for the same purpose.
With only a difference that MqttCallback is per client, but IMqttMessageListener is per request. Which is exactly what we need for our "shared client" use-case.
At the same time I want to see if the same IMqttMessageListener can be used for our case where we create client internally.

I also wonder if it really makes sense to new MessageListener() for every request, but not have it as a single one per channel adapter and just reuse for all the requests.
Kinda some minor, but still optimization.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org/eclipse/paho/mqttv5/client/MqttAsyncClient.java:1220 - passes an empty new MqttProperties()
and then org/eclipse/paho/mqttv5/client/MqttAsyncClient.java:1276 throws no such element

That is good catch! And I can confirm the bug is there.
Here is a bug report related: eclipse-paho/paho.mqtt.java#826

@oxcafedead
Copy link
Contributor Author

@artembilan I will push all the changes regarding you last review which I understand 🙂 and marked as resolved.
To my regret, since next week I will have less time as my vacation ends 😄 so apologies in advance if some changes will take more time to appear...

Fixes spring-projects#3685

Some fixes and improvements after another code review iteration:
* Rearrange the code according to the code style guides
* Move client instance to `AbstractClientManager` with `isRunning`
method
* Fix abstract adapter/handler fields visibility and `final`ize them
where we can
* Send application event if automatic reconnect is not enabled for the
client manager
Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say we are very close to the final solution.

Please, take a look what you can do with this my latest review.

* @author Artem Vozhdayenko
* @since 6.0
*/
@LongRunningTest
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How slow is this so it has driven you to for such a marker?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took ~18 seconds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, just found - after implementing event-based subscription and count down latches in tests - now tests pass in 4 seconds (last run result).
Maybe it makes sense to remove this @LongRunningTest anno

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. Thanks. I believe we need to take into account the start for the Mosquito container anyway

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you going to remove this annotation?
How slow is the test right now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last run time it was ~6.5 secs. I suppose we can remove it now, since we have other Mosquitto tests without it, plus the container creation time is 'shared' among these tests, as the container should be reused if I am not mistaken.
I will remove it.

@oxcafedead
Copy link
Contributor Author

@artembilan - I just ran all tests again and now I got the issue with client not connected in doStart again (after I have removed reconnect() invocation). For some reason, the default timeout is not enough. Sometimes. It sporadically passes. Probably the tests order matters? If container has not managed to fully start yet. However, sometimes I have 3 out of 4 failed - it is 20 * 3 = a whole minute of mqtt server unavailability. Strange.

2022-08-04 01:18:18,399 ERROR [Test worker] [org.springframework.integration.mqtt.core.Mqttv5ClientManager] - could not start client manager, client_id=client-manager-client-id-v5
org.eclipse.paho.mqttv5.common.MqttException: Timed out waiting for a response from the server

As the result, it does not reconnect immediately and test is failed with null message after the timeout, because the adapter has not subscribed to topics (which happens only on doStart method). There is a convenient callback connectComplete(isReconnect) to determine if the client has been connected the first time to subscribe. We could improve the logic by passing this callback through the client manager to adapter class. I will try to make it and see if tests will be stable and green.

@oxcafedead
Copy link
Contributor Author

The approach with connectComplete broke v3 adapter because it contradicts with manual scheduleReconnects.
This becomes more messy, I will get back to it when I have time...

@artembilan
Copy link
Member

The approach with connectComplete broke v3 adapter

That's correct. That's why at some point I asked if we can re-write v3 channel adapter logic to avoid that manual reconnetion.
Kinda implement it the same way we have now in v5 version of the channel adapter.

The Mqttv5PahoMessageDrivenChannelAdapter has that connectComplete(boolean reconnect, String serverURI) impl from the MqttCallback to make a subscription when we really have connected to the server.
This one works for an internal client.
Since we cannot provide this MqttCallback to a shared client, we have to implement something like addConnectCompleteCallback(BiConsumer<boolean, String> connectCompleteCallback) in the ClientManager.
Store all the provided into a List and delegate to them from the ClientManager 's MqttCallback.connectComplete() impl.
That add is going to be called from the onInit() (or doStart()) of the message-driver channel adapter.
Of course, it sounds like removeConnectCompleteCallback() has to be there as well to be called from doStop() or destroy().

@@ -96,6 +90,9 @@ public synchronized void start() {
logger.error("MQTT client failed to re-connect.", ex);
}
}
else if (getApplicationEventPublisher() != null) {
getApplicationEventPublisher().publishEvent(new MqttConnectionFailedEvent(this, e));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to extract to the local variable and avoid extra method call.
I don't think the compiler is smart enough to do such an optimization on our behalf...

@@ -95,6 +89,9 @@ public synchronized void start() {
logger.error("MQTT client failed to re-connect.", ex);
}
}
else if (getApplicationEventPublisher() != null) {
getApplicationEventPublisher().publishEvent(new MqttConnectionFailedEvent(this, e));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DITTO

@@ -74,7 +74,7 @@

private MqttMessageConverter converter;

protected ClientManager<T> clientManager;
private final ClientManager<T> clientManager;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just nit-pick: final props must be before the others.

public MqttPahoMessageDrivenChannelAdapter(String url, String clientId, String... topic) {
this(url, clientId, new DefaultMqttPahoClientFactory(), topic);
public MqttPahoMessageDrivenChannelAdapter(ClientManager<IMqttAsyncClient> clientManager, String... topic) {
this(clientManager, new MqttConnectOptions(), topic);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that we have discussed: no contradicting options. Better to take them from the provided client.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still haven't pushed any changes yet 🙂
Sorry, maybe the fact I resolve our conversations confuses you. I do it for myself to check that locally I have resolved this. Probably better to leave them unless the code it on 'your' side..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see . Sorry, yes, makes sense to have it resolved that way. Really confused me: have a lot to review in parallel , so I lose context from time to time 😅

@@ -162,6 +204,11 @@ protected void onInit() {

@Override
protected void doStart() {
if (getClientManager() != null) {
subscribeToAll();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! I see now: you have broken the logic.
We must not try to subscribe in the doStart().
We have to try to do that exactly from the connectComplete() callback.
The problem is present just because mqttClient.reconnect() is an async scheduled operation.
So, even if client manager is started, it doesn't mean that it has been connected.
Therefore we really need to wait for the connectComplete() before trying to subscribe.
Please, see my other comment where I suggest to implement addConnectCompleteCallback()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we cannot provide this MqttCallback to a shared client, we have to implement something like addConnectCompleteCallback(BiConsumer<boolean, String> connectCompleteCallback) in the ClientManager.

I already did smth similar but I have created an inner Callback interface in ClientManager with semantically clear method connectComplete(boolean isReconnect, String serverURI) - I guess it should be easier for users to understand the purpose of the callback when they see parameter and method names then just plain BiConsumer with types. If you don't have objections I would like to use this interface.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we can go that way, but let’s name it a ConnectCallback to be more specific . Also I don’t see a reason in the url param: we definitely know to what manager we provide a callback.

MessageListener[] listeners = IntStream.range(0, topics.length)
.mapToObj(t -> new MessageListener())
.toArray(MessageListener[]::new);
getClientManager().getClient().subscribe(subscriptions, null, null, listeners, null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org/eclipse/paho/mqttv5/client/MqttAsyncClient.java:1220 - passes an empty new MqttProperties()
and then org/eclipse/paho/mqttv5/client/MqttAsyncClient.java:1276 throws no such element

That is good catch! And I can confirm the bug is there.
Here is a bug report related: eclipse-paho/paho.mqtt.java#826

@oxcafedead
Copy link
Contributor Author

That's correct. That's why at some point I asked if we can re-write v3 channel adapter logic to avoid that manual reconnetion.
Kinda implement it the same way we have now in v5 version of the channel adapter.

I think this will be much better and cleaner, to get rid of this task scheduler and scheduled reconnects.
However, this will be not backwards compatible. Now, if automatic reconnect is turned off for some reason, the client will still be automatically recreated in scheduleReconnect method. I know this is an obvious bug - but may not be so obvious for some users 🙂
If you don't see here any issues - fine by me.

@artembilan
Copy link
Member

If you don't see here any issues - fine by me.

thanks for confirmation. No, I don’t see an issue . This is a major release, so it is fully fine to some breaking changes. More over I believe most solution are based on just an url for MQTT broker in which case we add an auto-recconnect option internally . Either way we are going to document this properly .

@oxcafedead
Copy link
Contributor Author

If you don't see here any issues - fine by me.

thanks for confirmation. No, I don’t see an issue . This is a major release, so it is fully fine to some breaking changes. More over I believe most solution are based on just an url for MQTT broker in which case we add an auto-recconnect option internally . Either way we are going to document this properly .

OK, and regarding recoveryInterval - seems like this property becomes useless. Can we just remove it as well?

@artembilan
Copy link
Member

OK, and regarding recoveryInterval - seems like this property becomes useless. Can we just remove it as well?

yes, this makes sense . Let’s remove it then! Thank you for the pointer!

Honestly, my idea was to implement the feature for v5. Confirm that it is OK and then rework v3 accordingly . Now it’s up to you to go ahead with your convenient way.

@oxcafedead
Copy link
Contributor Author

yes, this makes sense . Let’s remove it then! Thank you for the pointer!

Honestly, my idea was to implement the feature for v5. Confirm that it is OK and then rework v3 accordingly . Now it’s up to you to go ahead with your convenient way.

As far as I understand, this functionality can be covered by MqttConnectionOptions.automaticReconnectMinDelay, MqttConnectionOptions.automaticReconnectMaxDelay so users should have this flexibility - however not configurable via the XML approach conveniently.

@artembilan
Copy link
Member

Right , we just remove that option from the XSD as well. Let’s talk about an XML support when we are done with this. No reason to cover everything in one PR: the bigger it is - the slower review and merge process.

Fixes spring-projects#3685

Other fixes and improvements after code review:
* Changes around fields, methods, ctors visibility
* Removed contradictory ctors
* Reduce amount of unnecessary `getClientManager() != null` checks
in logic and make it as similar as possible for client manager and the
old approach
* Use auto-reconnect where possible
* Remove manual reconnect trigger and rely on events instead to know
where to subscribe
* Do not close the connection in adapter to be able to use reconnect
logic without lose of subscriptions
* Make `ClientManager` extend `MqttComponent` so that it knows about
connection options as part of its contract
* Remove not relevant auto test cases (relying on connection close or
manual reconnect)
* Other code style smaller changes
@oxcafedead
Copy link
Contributor Author

Phew, finally, another push... a lot of changes. Tests pass locally, finally

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The review for current state of code.
Again: I didn't review v3 changes, just because I didn't find v5 implemented correctly yet.

Thanks and sorry that this task became so complex.

}

protected Set<ConnectCallback> getCallbacks() {
return Collections.unmodifiableSet(this.connectCallbacks);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method is protected, so unlikely going to be used for some outside harm.
Therefore I don't see a reason in extra wrapping to prevent modification.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just the defense if someone decides to implement own client manager incorrectly, but I will remove it, no problem

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to implement and to use is not the same and there is likely less harm when it is an internal logic than some end-user API.

catch (MqttException e) {
logger.error("could not start client manager, client_id=" + getClientId(), e);

var applicationEventPublisher = getApplicationEventPublisher();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why no reconnect logic as we discussed?
Exactly the same what we have right now in the Mqttv5PahoMessageDrivenChannelAdapter.doStart().
That exactly was a purpose of our discussion around that setAutomaticReconnect(true) option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't see any reason to call reconnect manually because if automatic reconnect is enabled, the client is reconnected automatically anyway (tests show it works). This reconnect just zeroes out the timer.
Also if automatic reconnect is disabled, I thought we have discussed that the user gets info message about it and have to subscribe to MqttConnectionFailedEvent

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, see this issue for more info: #3822.

In two words: if client is not connected via first connect(), the reconnect() is not initiated.
That's why we have that logic in the Mqttv5PahoMessageDrivenChannelAdapter.doStart().

I wonder if you have modified the Mqttv5BackToBackAutomaticReconnectTests to adjust its behavior to the logic you have changed.
However it should be like: "why this test is failing when I have removed reconnect()"?

WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I see now, I was really sure that there was no point for reconnect but this makes sense now. Indeed I should have made git blame, sometimes I think that smth might be implemented for a reason but not this time, haha.

return this.clientManagerConnectCallback;
}

protected void setClientManagerCallback(ClientManager.ConnectCallback clientManagerConnectCallback) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not.
The AbstractMqttMessageDrivenChannelAdapter creates such a ConnectCallback internally and register it with the provided ClientManager.
See, for example, an AbstractRSocketConnector and its addEndpoint() call from the RSocketInboundGateway.
No reason to have some extra API around: we just call getClientManager().addCallback() from target channel adapter impl.
I don't see, though, how you connect two of them here...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This setter and field is just to be able to save the listener instance to remove it later: see in destroy method,

if (clientManager != null) {
	clientManager.removeCallback(getClientManagerCallback());
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! I see. Yep, you are right!
How about something like clientManager.addCallback(this) and clientManager.removeCallback(this)?
And have that ClientManager.ConnectCallback impl directly on the AbstractMqttMessageDrivenChannelAdapter.
This is an impl for just one more method which is going to delegate to another.
And there won't be those protected settter and getter.

Or is this going to be too confusing from code perspective?
For end-user nothing changed though: they even not going to use this API in most cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both approaches are not ideal 🙂 - but I guess having protected getters/setters is kinda an overkill for it.
I like your approach with (this) better. I will implement it.


private static MqttConnectionOptions buildDefaultConnectionOptions(@Nullable String url) {
final MqttConnectionOptions connectionOptions;
connectionOptions = new MqttConnectionOptions();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why divide declaration and initialization to two separate lines?

.withCauseExactlyInstanceOf(MqttException.class)
.withRootCauseExactlyInstanceOf(UnknownHostException.class);

connectionOptions.setServerURIs(new String[]{ MosquittoContainerTest.mqttUrl() });
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't you remove this as a collateral damage just because you have removed a reconnect() from the channel adapter when client is created internally?

catch (MqttException e) {
logger.error("could not start client manager, client_id=" + getClientId(), e);

var applicationEventPublisher = getApplicationEventPublisher();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, see this issue for more info: #3822.

In two words: if client is not connected via first connect(), the reconnect() is not initiated.
That's why we have that logic in the Mqttv5PahoMessageDrivenChannelAdapter.doStart().

I wonder if you have modified the Mqttv5BackToBackAutomaticReconnectTests to adjust its behavior to the logic you have changed.
However it should be like: "why this test is failing when I have removed reconnect()"?

WDYT?

return this.clientManagerConnectCallback;
}

protected void setClientManagerCallback(ClientManager.ConnectCallback clientManagerConnectCallback) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! I see. Yep, you are right!
How about something like clientManager.addCallback(this) and clientManager.removeCallback(this)?
And have that ClientManager.ConnectCallback impl directly on the AbstractMqttMessageDrivenChannelAdapter.
This is an impl for just one more method which is going to delegate to another.
And there won't be those protected settter and getter.

Or is this going to be too confusing from code perspective?
For end-user nothing changed though: they even not going to use this API in most cases.

@@ -104,7 +92,7 @@ public static class Config {
@Bean
public MqttConnectionOptions mqttConnectOptions() {
return new MqttConnectionOptionsBuilder()
.serverURI("wss://badMqttUrl")
.serverURI(MosquittoContainerTest.mqttUrl())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was really the point to have this test exactly this way.
Please, see my comment about remove reconnect() an related issue when that feature and this test have need introduced.
And consider to reinstate that logic and this test.

Thank you!

Fixes spring-projects#3685

Other fixes and improvements after code review:
* Get manual `reconnect` invocation back for v3/v5 adapters and client
managers (see bug spring-projectsGH-3822 for a reasoning)
* Remove unnecessary getters/setter for a listener and use adapter
class as listener instead
* Optimize MessageListener: remove redundant inner class and use a
single method reference instead of N instances per each subscribe
* Javadocs improvements
@artembilan
Copy link
Member

GH-3685: Share MQTT connection across components

No need to add a header to every single commit.
In the end, on merge, I'm going to squash them all to a single one.

I prefer to add extra commits just with the messages about changes in this commit.
Now I see what you meant about adding them, but seeing a header on all your commits it confuses me since the rest of comments are hidden. 😄

This probably was just my bad habit to believe that people does not add comments into their commits.
Sorry I will try to be more careful net time

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good progress!

Thank you for your patience!

AbstractMqttClientManager(String clientId) {
Assert.notNull(clientId, "'clientId' is required");
this.clientId = clientId;
this.connectCallbacks = Collections.synchronizedSet(new HashSet<>());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that I have commented before that this initialization could be done directly on the property declaration.

.waitForCompletion(this.connectionOptions.getConnectionTimeout());
}
catch (MqttException e) {
logger.error("could not start client manager, client_id=" + getClientId(), e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this noise in logs since we are going to initiate a reconnect action.
Otherwise we are going to publish that MqttConnectionFailedEvent.
We may log in the third (very rare) case when there is no applicationEventPublisher provided.

this.clientId = null;
}

private Set<Topic> initTopics(String[] topic) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static ?

this.connectionOptions = clientManager.getConnectionInfo();
}

private static MqttConnectionOptions buildDefaultConnectionOptions(@Nullable String url) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need this method any more.
Also I think we just need to add Assert.hasText(url, "'url' cannot be null or empty"); into that url-based ctor.
Just because we cannot an empty url without external options: or url, or options with server uris.

@@ -158,30 +185,39 @@ protected void onInit() {
.getBean(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME,
SmartMessageConverter.class));
}
if (clientManager != null) {
clientManager.addCallback(this);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot we do this in a single place in the super.onInit() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea!

this.mqttClient.close(true);
}
}
catch (MqttException ex) {
logger.error(ex, "Failed to close 'MqttAsyncClient'");
}
if (clientManager != null) {
clientManager.removeCallback(this);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here about moving this logic into a common super.destroy()

this.connectionOptions = clientManager.getConnectionInfo();
}

private static MqttConnectionOptions buildDefaultConnectionOptions(@Nullable String url) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think that we don't need this method and url must not be null for that url-based ctor.
Exactly the same comment I did for the message-driven channel adapter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Makes sense.

}
else {
if (!this.mqttClient.isConnected()) {
this.mqttClient.connect(this.connectionOptions).waitForCompletion(completionTimeout);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have to try to reconnect over here independently of the client source.
Even if client is provided by the manager, it is still legit to call this connect() in case of lost connection in the client.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you are right. Won't be harmful - except that if client is being connected in background, connect method will throw an MqttException. But this is probably a rare case.

* @author Artem Vozhdayenko
* @since 6.0
*/
@LongRunningTest
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you going to remove this annotation?
How slow is the test right now?

* Extract common callback add/rm logic to abstract adapter class
* Small code cleanups/fixes related to code style & simplicity, ctor
inits and unnecessary methods; eliminate unnecessary logs noise
* Remove `@LongRunningTest` for `ClientManagerBackToBackTests` as test
run time is ~6-7 secs
Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some further review.


private String url;

private volatile T client;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just another nit-pick: volatile props go after all others 😄

}

/**
* Sets the phase of component autostart in {@link SmartLifecycle}.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -221,7 +250,8 @@ public void addTopic(String topic, int qos) {
try {
super.addTopic(topic, qos);
if (this.mqttClient != null && this.mqttClient.isConnected()) {
this.mqttClient.subscribe(topic, qos).waitForCompletion(getCompletionTimeout());
this.mqttClient.subscribe(new MqttSubscription(topic, qos), this::messageArrived)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to subscribe to newly added topics even if the client comes from the manager?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We init mqttClient field from the client manager in the start method, just as for v3 adapter 🙂
So this code is almost not changed, at least no additional ifs

}
}

@Override
protected void doStop() {
try {
this.mqttClient.disconnect().waitForCompletion(getDisconnectCompletionTimeout());
if (this.mqttClient != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you do now:

			else if (clientManager != null) {
				this.mqttClient = clientManager.getClient();
			}

in the doStart(), we have to check here for the getClientManager() != null before performing disconnect().
As we agreed: we cannot not close the client we don't owe.
Same applies for the close() later on in the destroy()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right - changed it at the last moment and missed this part

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the mqttClient != null check is redundant in doStart/doStop/destroy methods. It cannot be null from onInit method - if client cannot be created we throw BeanCreationException

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's it.
Reviewed all of them.

Thank you!

this(buildDefaultClientFactory(url), clientId);
}

public Mqttv3ClientManager(MqttPahoClientFactory clientFactory, String clientId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we still need to rely on this factory for this our new ClientManager abstraction.
How about just to make this v3 version similar to v5 - based on the MqttConnectionOptions.
And perhaps we need to add one more MqttClientPersistence option to these managers impls.
A setter should be OK.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, initially if I remember correctly you asked to implement client manager using existing client factory abstraction 😄

if (this.client instanceof MqttClient) {
((MqttClient) this.client).setTimeToWait(getCompletionTimeout());

long completionTimeout = getCompletionTimeout();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason to extract this into a variable: used only in one place.

private void subscribe() {
var clientManager = getClientManager();
if (clientManager != null && this.client == null) {
this.client = clientManager.getClient();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we still need this logic in the subscribe(). Looks like you are extracting this.client in the connect(), so should be OK here in the subscribe() to not worry about that property value any more.
Do I miss anything?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you're right, no sense to have it duplicated

IMqttToken subscribeToken = this.client.subscribe(topics, requestedQos, listeners);
subscribeToken.waitForCompletion(getCompletionTimeout());
int[] grantedQos = subscribeToken.getGrantedQos();
if (grantedQos.length == 1 && grantedQos[0] == 0x80) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Magic number? // NOSONAR ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting case 🙂
Initially v3 adapter had a blocking client impl and I have changed it to use async client.
One test case started failing after that, it was MqttAdapterTests#testSubscribeFailure.
You can see it mocks this magic number 0x80:
given(token.getGrantedQos()).willReturn(new int[]{ 0x80 });

I have looked through the code of a blocking client and it is just a wrapper around async one - but it really has this strange difference in subscribe method, /org/eclipse/paho/client/mqttv3/MqttClient.java:465

		if (grantedQos.length == 1 && qos[0] == 0x80) {
			throw new MqttException(MqttException.REASON_CODE_SUBSCRIBE_FAILED);
		}

So I decided to keep the behavior, especially given we had a unit test for it.

I will add //NOSONAR for now..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! Makes sense. Let's live with this for now only in v3 impl!

.mapToObj(t -> listener)
.toArray(IMqttMessageListener[]::new);
this.mqttClient.subscribe(subscriptions, null, null, listeners, null)
.waitForCompletion(getCompletionTimeout());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to do the same subscribeToken.getGrantedQos() logic over here as well, like you have introduced in v3 impl?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For v5 we didn't have it initially. So I am not sure. Probably not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I don't find this 0x80 in the blocking client in v5 as well. Probably it was something related to v3 only.

plain connection properties and client persistence instead
* Add missed javadocs
* Other code style & cleanup improvements
Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's it.
I'll pull this locally today for final review and possible merging.

Thank you very much for a great work!

this.clientId = clientId;
}

protected void setManualAcks(boolean manualAcks) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that this one has to be public

@artembilan
Copy link
Member

Merged as 5f12729 after some minor code clean up and refactoring.

@oxcafedead ,
this was huge; excellent collaboration!
Now, if you wish, you can go ahead and add docs for this new feature. See mqtt.adoc. I believe a new === Shared MQTT Client Support paragraph in the end would be great.
Thank you very much!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Share MQTT connection between inbound & outbound IntegrationFlows
2 participants