-
Notifications
You must be signed in to change notification settings - Fork 1.1k
GH-3685: Share MQTT connection across components #3857
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Hi @artembilan |
...egration-mqtt/src/main/java/org/springframework/integration/mqtt/core/MqttClientManager.java
Outdated
Show resolved
Hide resolved
...n/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You know doesn't look so bad.
But still some concerns to clean up.
I wish we have started from a central management for the client from day first...
...-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/ClientManager.java
Outdated
Show resolved
Hide resolved
...ration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java
Outdated
Show resolved
Hide resolved
...ration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java
Outdated
Show resolved
Hide resolved
...ration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java
Outdated
Show resolved
Hide resolved
...ration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java
Outdated
Show resolved
Hide resolved
...ration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java
Outdated
Show resolved
Hide resolved
...ration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java
Outdated
Show resolved
Hide resolved
...ration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java
Outdated
Show resolved
Hide resolved
...n/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java
Outdated
Show resolved
Hide resolved
...java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java
Outdated
Show resolved
Hide resolved
I've added some changes to use client manager in adapters and handlers, also added support for both versions of clients. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: unit tests covering shared client usage, possible bugfixes, XSD schemas and parsers update, documentation. (also squash commits)
Unit tests are great to have.
The rest of work can be done in the separate PRs.
No, please, not squashing: we do that on merge.
It is better to preserve commits history in the PR as is: must easier to review.
...-mqtt/src/main/java/org/springframework/integration/mqtt/core/AbstractMqttClientManager.java
Show resolved
Hide resolved
...ration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java
Outdated
Show resolved
Hide resolved
...java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java
Show resolved
Hide resolved
...ration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv5ClientManager.java
Outdated
Show resolved
Hide resolved
...on-mqtt/src/test/java/org/springframework/integration/mqtt/ClientManagerBackToBackTests.java
Show resolved
Hide resolved
...on-mqtt/src/test/java/org/springframework/integration/mqtt/ClientManagerBackToBackTests.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
your contribution is great and very valuable.
No need to worry about me: this is my project and it is my responsibility to review your changes as long as it is needed.
That's my apologies that this task takes more time, than it looked originally: you see I also change my mind from time to time. Plus indeed I feel like I learn something from you as well 😄
So, big thanks to you again and keep her steady!
Let me know when you are going to give up, so I'll take from there my self: we still have to implement this feature since it looks like very reasonable from target use-case 😉
...ration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java
Outdated
Show resolved
Hide resolved
...ration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv5ClientManager.java
Show resolved
Hide resolved
...ration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv5ClientManager.java
Outdated
Show resolved
Hide resolved
...va/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java
Outdated
Show resolved
Hide resolved
...on-mqtt/src/test/java/org/springframework/integration/mqtt/ClientManagerBackToBackTests.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, consider to rebase your branch to the latest main
: we finally switched to the CompletableFuture
whenever ListenableFuture
is deprecated.
This way your local build won't fail for some unexpected reasons 😄
@artembilan - I will push latest changes with rebase in an hour. |
Fixes spring-projects#3685 Introduce some initial design. Add a new interface `ClientManager` which will manage clients and connections. Use this manager in v3 topic adapter and message handler.
Fixes spring-projects#3685 Add a new interface `ClientManager` which will manage clients and connections. Add different implementations for v3 and v5 MQTT clients. Use this manager in v3/v5 topic adapters and message handlers.
Fixes spring-projects#3685 Add a couple of unit/integration tests to cover client manager usage. Several small code improvements after the code review: * Improve client manager usage via providing several mutual exclusive constructors, whether the users provides `url` or `connectionOptions` or `clientFactory` for v3. * Move the logger to `AbstractMqttClientManager` * Do not inject TaskScheduler in constructor for v3 client manager but use lazy init via `BeanFactory` and `IntegrationContextUtils` * Other smaller code readability improvements
Fixes spring-projects#3685 Add new tests with reconnect cases. Other code improvements after the code review: * Adjust javadocs according to standards * Remove `setClientManager` and use exclusive ctors * Make automatic reconnects using the v3 client instead of manually using task scheduler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't review v3 variant because I was not involved in its implementation back in days, but I believe with our discussion to realign both protocols in their ClientManager
s my v5 review will apply there as well.
...-mqtt/src/main/java/org/springframework/integration/mqtt/core/AbstractMqttClientManager.java
Outdated
Show resolved
Hide resolved
...ration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv5ClientManager.java
Outdated
Show resolved
Hide resolved
...ration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv5ClientManager.java
Outdated
Show resolved
Hide resolved
...ration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv5ClientManager.java
Outdated
Show resolved
Hide resolved
...ration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv5ClientManager.java
Outdated
Show resolved
Hide resolved
...java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java
Outdated
Show resolved
Hide resolved
MessageListener[] listeners = IntStream.range(0, topics.length) | ||
.mapToObj(t -> new MessageListener()) | ||
.toArray(MessageListener[]::new); | ||
getClientManager().getClient().subscribe(subscriptions, null, null, listeners, null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder what is wrong with this method:
public IMqttToken subscribe(MqttSubscription[] subscriptions, IMqttMessageListener messageListener) throws MqttException;
and remember we talked about using a MessageListener
even of an internally controlled client?
That's why I'm asking about that this.mqttClient = getClientManager().getClient()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initially I have used this method you mentioned - but it gives a runtime exception. Yeah, I found it when ran my back-to-back integration tests with mosquitto. And it seems like the bug in the client lib.
org/eclipse/paho/mqttv5/client/MqttAsyncClient.java:1220
- passes an empty new MqttProperties()
and then org/eclipse/paho/mqttv5/client/MqttAsyncClient.java:1276
throws no such element
and remember we talked about using a MessageListener even of an internally controlled client?
Probably some other misunderstanding... sorry, I don't get it. MessageListener
is the internal class of adapter which is a bridge: it implements the interface IMqttMessageListener
of the library and reuses the existing logic of messageArrived
method of the adapter class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The messageArrived()
is not this class method. It is dictated by that MqttCallback
we implement on this class.
With my finding it looks like this messageArrived()
and IMqttMessageListener
are exactly for the same purpose.
With only a difference that MqttCallback
is per client, but IMqttMessageListener
is per request. Which is exactly what we need for our "shared client" use-case.
At the same time I want to see if the same IMqttMessageListener
can be used for our case where we create client internally.
I also wonder if it really makes sense to new MessageListener()
for every request, but not have it as a single one per channel adapter and just reuse for all the requests.
Kinda some minor, but still optimization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
org/eclipse/paho/mqttv5/client/MqttAsyncClient.java:1220 - passes an empty new MqttProperties()
and then org/eclipse/paho/mqttv5/client/MqttAsyncClient.java:1276 throws no such element
That is good catch! And I can confirm the bug is there.
Here is a bug report related: eclipse-paho/paho.mqtt.java#826
.../src/main/java/org/springframework/integration/mqtt/outbound/AbstractMqttMessageHandler.java
Outdated
Show resolved
Hide resolved
...tt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java
Outdated
Show resolved
Hide resolved
...tt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java
Outdated
Show resolved
Hide resolved
@artembilan I will push all the changes regarding you last review which I understand 🙂 and marked as resolved. |
Fixes spring-projects#3685 Some fixes and improvements after another code review iteration: * Rearrange the code according to the code style guides * Move client instance to `AbstractClientManager` with `isRunning` method * Fix abstract adapter/handler fields visibility and `final`ize them where we can * Send application event if automatic reconnect is not enabled for the client manager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say we are very close to the final solution.
Please, take a look what you can do with this my latest review.
...-mqtt/src/main/java/org/springframework/integration/mqtt/core/AbstractMqttClientManager.java
Outdated
Show resolved
Hide resolved
...java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java
Show resolved
Hide resolved
...java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java
Outdated
Show resolved
Hide resolved
...java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java
Outdated
Show resolved
Hide resolved
...java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java
Outdated
Show resolved
Hide resolved
...java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/springframework/integration/mqtt/outbound/AbstractMqttMessageHandler.java
Outdated
Show resolved
Hide resolved
...tt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java
Outdated
Show resolved
Hide resolved
* @author Artem Vozhdayenko | ||
* @since 6.0 | ||
*/ | ||
@LongRunningTest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How slow is this so it has driven you to for such a marker?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It took ~18 seconds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, just found - after implementing event-based subscription and count down latches in tests - now tests pass in 4 seconds (last run result).
Maybe it makes sense to remove this @LongRunningTest
anno
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so. Thanks. I believe we need to take into account the start for the Mosquito container anyway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you going to remove this annotation?
How slow is the test right now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Last run time it was ~6.5 secs. I suppose we can remove it now, since we have other Mosquitto tests without it, plus the container creation time is 'shared' among these tests, as the container should be reused if I am not mistaken.
I will remove it.
@artembilan - I just ran all tests again and now I got the issue with client not connected in
As the result, it does not reconnect immediately and test is failed with null message after the timeout, because the adapter has not subscribed to topics (which happens only on |
The approach with |
That's correct. That's why at some point I asked if we can re-write v3 channel adapter logic to avoid that manual reconnetion. The |
@@ -96,6 +90,9 @@ public synchronized void start() { | |||
logger.error("MQTT client failed to re-connect.", ex); | |||
} | |||
} | |||
else if (getApplicationEventPublisher() != null) { | |||
getApplicationEventPublisher().publishEvent(new MqttConnectionFailedEvent(this, e)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to extract to the local variable and avoid extra method call.
I don't think the compiler is smart enough to do such an optimization on our behalf...
@@ -95,6 +89,9 @@ public synchronized void start() { | |||
logger.error("MQTT client failed to re-connect.", ex); | |||
} | |||
} | |||
else if (getApplicationEventPublisher() != null) { | |||
getApplicationEventPublisher().publishEvent(new MqttConnectionFailedEvent(this, e)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DITTO
@@ -74,7 +74,7 @@ | |||
|
|||
private MqttMessageConverter converter; | |||
|
|||
protected ClientManager<T> clientManager; | |||
private final ClientManager<T> clientManager; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just nit-pick: final
props must be before the others.
public MqttPahoMessageDrivenChannelAdapter(String url, String clientId, String... topic) { | ||
this(url, clientId, new DefaultMqttPahoClientFactory(), topic); | ||
public MqttPahoMessageDrivenChannelAdapter(ClientManager<IMqttAsyncClient> clientManager, String... topic) { | ||
this(clientManager, new MqttConnectOptions(), topic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought that we have discussed: no contradicting options. Better to take them from the provided client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still haven't pushed any changes yet 🙂
Sorry, maybe the fact I resolve our conversations confuses you. I do it for myself to check that locally I have resolved this. Probably better to leave them unless the code it on 'your' side..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see . Sorry, yes, makes sense to have it resolved that way. Really confused me: have a lot to review in parallel , so I lose context from time to time 😅
...java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java
Show resolved
Hide resolved
@@ -162,6 +204,11 @@ protected void onInit() { | |||
|
|||
@Override | |||
protected void doStart() { | |||
if (getClientManager() != null) { | |||
subscribeToAll(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh! I see now: you have broken the logic.
We must not try to subscribe in the doStart()
.
We have to try to do that exactly from the connectComplete()
callback.
The problem is present just because mqttClient.reconnect()
is an async scheduled operation.
So, even if client manager is started, it doesn't mean that it has been connected.
Therefore we really need to wait for the connectComplete()
before trying to subscribe.
Please, see my other comment where I suggest to implement addConnectCompleteCallback()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we cannot provide this
MqttCallback
to a shared client, we have to implement something likeaddConnectCompleteCallback(BiConsumer<boolean, String> connectCompleteCallback
) in the ClientManager.
I already did smth similar but I have created an inner Callback
interface in ClientManager
with semantically clear method connectComplete(boolean isReconnect, String serverURI)
- I guess it should be easier for users to understand the purpose of the callback when they see parameter and method names then just plain BiConsumer
with types. If you don't have objections I would like to use this interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, we can go that way, but let’s name it a ConnectCallback
to be more specific . Also I don’t see a reason in the url
param: we definitely know to what manager we provide a callback.
MessageListener[] listeners = IntStream.range(0, topics.length) | ||
.mapToObj(t -> new MessageListener()) | ||
.toArray(MessageListener[]::new); | ||
getClientManager().getClient().subscribe(subscriptions, null, null, listeners, null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
org/eclipse/paho/mqttv5/client/MqttAsyncClient.java:1220 - passes an empty new MqttProperties()
and then org/eclipse/paho/mqttv5/client/MqttAsyncClient.java:1276 throws no such element
That is good catch! And I can confirm the bug is there.
Here is a bug report related: eclipse-paho/paho.mqtt.java#826
I think this will be much better and cleaner, to get rid of this task scheduler and scheduled reconnects. |
thanks for confirmation. No, I don’t see an issue . This is a major release, so it is fully fine to some breaking changes. More over I believe most solution are based on just an url for MQTT broker in which case we add an auto-recconnect option internally . Either way we are going to document this properly . |
OK, and regarding |
yes, this makes sense . Let’s remove it then! Thank you for the pointer! Honestly, my idea was to implement the feature for v5. Confirm that it is OK and then rework v3 accordingly . Now it’s up to you to go ahead with your convenient way. |
As far as I understand, this functionality can be covered by |
Right , we just remove that option from the XSD as well. Let’s talk about an XML support when we are done with this. No reason to cover everything in one PR: the bigger it is - the slower review and merge process. |
Fixes spring-projects#3685 Other fixes and improvements after code review: * Changes around fields, methods, ctors visibility * Removed contradictory ctors * Reduce amount of unnecessary `getClientManager() != null` checks in logic and make it as similar as possible for client manager and the old approach * Use auto-reconnect where possible * Remove manual reconnect trigger and rely on events instead to know where to subscribe * Do not close the connection in adapter to be able to use reconnect logic without lose of subscriptions * Make `ClientManager` extend `MqttComponent` so that it knows about connection options as part of its contract * Remove not relevant auto test cases (relying on connection close or manual reconnect) * Other code style smaller changes
Phew, finally, another push... a lot of changes. Tests pass locally, finally |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The review for current state of code.
Again: I didn't review v3 changes, just because I didn't find v5 implemented correctly yet.
Thanks and sorry that this task became so complex.
...-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/ClientManager.java
Show resolved
Hide resolved
...-mqtt/src/main/java/org/springframework/integration/mqtt/core/AbstractMqttClientManager.java
Show resolved
Hide resolved
} | ||
|
||
protected Set<ConnectCallback> getCallbacks() { | ||
return Collections.unmodifiableSet(this.connectCallbacks); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method is protected
, so unlikely going to be used for some outside harm.
Therefore I don't see a reason in extra wrapping to prevent modification.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just the defense if someone decides to implement own client manager incorrectly, but I will remove it, no problem
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to implement
and to use
is not the same and there is likely less harm when it is an internal logic than some end-user API.
catch (MqttException e) { | ||
logger.error("could not start client manager, client_id=" + getClientId(), e); | ||
|
||
var applicationEventPublisher = getApplicationEventPublisher(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why no reconnect logic as we discussed?
Exactly the same what we have right now in the Mqttv5PahoMessageDrivenChannelAdapter.doStart()
.
That exactly was a purpose of our discussion around that setAutomaticReconnect(true)
option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I don't see any reason to call reconnect
manually because if automatic reconnect is enabled, the client is reconnected automatically anyway (tests show it works). This reconnect
just zeroes out the timer.
Also if automatic reconnect is disabled, I thought we have discussed that the user gets info message about it and have to subscribe to MqttConnectionFailedEvent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, see this issue for more info: #3822.
In two words: if client is not connected via first connect()
, the reconnect()
is not initiated.
That's why we have that logic in the Mqttv5PahoMessageDrivenChannelAdapter.doStart()
.
I wonder if you have modified the Mqttv5BackToBackAutomaticReconnectTests
to adjust its behavior to the logic you have changed.
However it should be like: "why this test is failing when I have removed reconnect()
"?
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I see now, I was really sure that there was no point for reconnect but this makes sense now. Indeed I should have made git blame, sometimes I think that smth might be implemented for a reason but not this time, haha.
return this.clientManagerConnectCallback; | ||
} | ||
|
||
protected void setClientManagerCallback(ClientManager.ConnectCallback clientManagerConnectCallback) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's not.
The AbstractMqttMessageDrivenChannelAdapter
creates such a ConnectCallback
internally and register it with the provided ClientManager
.
See, for example, an AbstractRSocketConnector
and its addEndpoint()
call from the RSocketInboundGateway
.
No reason to have some extra API around: we just call getClientManager().addCallback()
from target channel adapter impl.
I don't see, though, how you connect two of them here...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This setter and field is just to be able to save the listener instance to remove it later: see in destroy
method,
if (clientManager != null) {
clientManager.removeCallback(getClientManagerCallback());
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah! I see. Yep, you are right!
How about something like clientManager.addCallback(this)
and clientManager.removeCallback(this)
?
And have that ClientManager.ConnectCallback
impl directly on the AbstractMqttMessageDrivenChannelAdapter
.
This is an impl for just one more method which is going to delegate to another.
And there won't be those protected
settter and getter.
Or is this going to be too confusing from code perspective?
For end-user nothing changed though: they even not going to use this API in most cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think both approaches are not ideal 🙂 - but I guess having protected getters/setters is kinda an overkill for it.
I like your approach with (this) better. I will implement it.
.../src/main/java/org/springframework/integration/mqtt/outbound/AbstractMqttMessageHandler.java
Show resolved
Hide resolved
...tt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java
Outdated
Show resolved
Hide resolved
|
||
private static MqttConnectionOptions buildDefaultConnectionOptions(@Nullable String url) { | ||
final MqttConnectionOptions connectionOptions; | ||
connectionOptions = new MqttConnectionOptions(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why divide declaration and initialization to two separate lines?
...ng-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java
Show resolved
Hide resolved
.withCauseExactlyInstanceOf(MqttException.class) | ||
.withRootCauseExactlyInstanceOf(UnknownHostException.class); | ||
|
||
connectionOptions.setServerURIs(new String[]{ MosquittoContainerTest.mqttUrl() }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't you remove this as a collateral damage just because you have removed a reconnect()
from the channel adapter when client is created internally?
catch (MqttException e) { | ||
logger.error("could not start client manager, client_id=" + getClientId(), e); | ||
|
||
var applicationEventPublisher = getApplicationEventPublisher(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, see this issue for more info: #3822.
In two words: if client is not connected via first connect()
, the reconnect()
is not initiated.
That's why we have that logic in the Mqttv5PahoMessageDrivenChannelAdapter.doStart()
.
I wonder if you have modified the Mqttv5BackToBackAutomaticReconnectTests
to adjust its behavior to the logic you have changed.
However it should be like: "why this test is failing when I have removed reconnect()
"?
WDYT?
return this.clientManagerConnectCallback; | ||
} | ||
|
||
protected void setClientManagerCallback(ClientManager.ConnectCallback clientManagerConnectCallback) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah! I see. Yep, you are right!
How about something like clientManager.addCallback(this)
and clientManager.removeCallback(this)
?
And have that ClientManager.ConnectCallback
impl directly on the AbstractMqttMessageDrivenChannelAdapter
.
This is an impl for just one more method which is going to delegate to another.
And there won't be those protected
settter and getter.
Or is this going to be too confusing from code perspective?
For end-user nothing changed though: they even not going to use this API in most cases.
...tt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/springframework/integration/mqtt/outbound/AbstractMqttMessageHandler.java
Show resolved
Hide resolved
@@ -104,7 +92,7 @@ public static class Config { | |||
@Bean | |||
public MqttConnectionOptions mqttConnectOptions() { | |||
return new MqttConnectionOptionsBuilder() | |||
.serverURI("wss://badMqttUrl") | |||
.serverURI(MosquittoContainerTest.mqttUrl()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was really the point to have this test exactly this way.
Please, see my comment about remove reconnect()
an related issue when that feature and this test have need introduced.
And consider to reinstate that logic and this test.
Thank you!
Fixes spring-projects#3685 Other fixes and improvements after code review: * Get manual `reconnect` invocation back for v3/v5 adapters and client managers (see bug spring-projectsGH-3822 for a reasoning) * Remove unnecessary getters/setter for a listener and use adapter class as listener instead * Optimize MessageListener: remove redundant inner class and use a single method reference instead of N instances per each subscribe * Javadocs improvements
No need to add a header to every single commit. I prefer to add extra commits just with the messages about changes in this commit. This probably was just my bad habit to believe that people does not add comments into their commits. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good progress!
Thank you for your patience!
...-mqtt/src/main/java/org/springframework/integration/mqtt/core/AbstractMqttClientManager.java
Show resolved
Hide resolved
AbstractMqttClientManager(String clientId) { | ||
Assert.notNull(clientId, "'clientId' is required"); | ||
this.clientId = clientId; | ||
this.connectCallbacks = Collections.synchronizedSet(new HashSet<>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought that I have commented before that this initialization could be done directly on the property declaration.
.waitForCompletion(this.connectionOptions.getConnectionTimeout()); | ||
} | ||
catch (MqttException e) { | ||
logger.error("could not start client manager, client_id=" + getClientId(), e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this noise in logs since we are going to initiate a reconnect action.
Otherwise we are going to publish that MqttConnectionFailedEvent
.
We may log in the third (very rare) case when there is no applicationEventPublisher
provided.
this.clientId = null; | ||
} | ||
|
||
private Set<Topic> initTopics(String[] topic) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static
?
this.connectionOptions = clientManager.getConnectionInfo(); | ||
} | ||
|
||
private static MqttConnectionOptions buildDefaultConnectionOptions(@Nullable String url) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need this method any more.
Also I think we just need to add Assert.hasText(url, "'url' cannot be null or empty");
into that url-based ctor.
Just because we cannot an empty url without external options: or url, or options with server uris.
@@ -158,30 +185,39 @@ protected void onInit() { | |||
.getBean(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME, | |||
SmartMessageConverter.class)); | |||
} | |||
if (clientManager != null) { | |||
clientManager.addCallback(this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cannot we do this in a single place in the super.onInit()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea!
this.mqttClient.close(true); | ||
} | ||
} | ||
catch (MqttException ex) { | ||
logger.error(ex, "Failed to close 'MqttAsyncClient'"); | ||
} | ||
if (clientManager != null) { | ||
clientManager.removeCallback(this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here about moving this logic into a common super.destroy()
this.connectionOptions = clientManager.getConnectionInfo(); | ||
} | ||
|
||
private static MqttConnectionOptions buildDefaultConnectionOptions(@Nullable String url) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think that we don't need this method and url
must not be null for that url-based ctor.
Exactly the same comment I did for the message-driven channel adapter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Makes sense.
} | ||
else { | ||
if (!this.mqttClient.isConnected()) { | ||
this.mqttClient.connect(this.connectionOptions).waitForCompletion(completionTimeout); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have to try to reconnect over here independently of the client source.
Even if client is provided by the manager, it is still legit to call this connect()
in case of lost connection in the client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you are right. Won't be harmful - except that if client is being connected in background, connect
method will throw an MqttException
. But this is probably a rare case.
* @author Artem Vozhdayenko | ||
* @since 6.0 | ||
*/ | ||
@LongRunningTest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you going to remove this annotation?
How slow is the test right now?
* Extract common callback add/rm logic to abstract adapter class * Small code cleanups/fixes related to code style & simplicity, ctor inits and unnecessary methods; eliminate unnecessary logs noise * Remove `@LongRunningTest` for `ClientManagerBackToBackTests` as test run time is ~6-7 secs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some further review.
|
||
private String url; | ||
|
||
private volatile T client; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just another nit-pick: volatile
props go after all others 😄
} | ||
|
||
/** | ||
* Sets the phase of component autostart in {@link SmartLifecycle}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method Javadoc must be imperative, not infinitive: https://github.com/spring-projects/spring-framework/wiki/Code-Style#javadoc-formatting
...ration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv5ClientManager.java
Show resolved
Hide resolved
@@ -221,7 +250,8 @@ public void addTopic(String topic, int qos) { | |||
try { | |||
super.addTopic(topic, qos); | |||
if (this.mqttClient != null && this.mqttClient.isConnected()) { | |||
this.mqttClient.subscribe(topic, qos).waitForCompletion(getCompletionTimeout()); | |||
this.mqttClient.subscribe(new MqttSubscription(topic, qos), this::messageArrived) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to subscribe to newly added topics even if the client comes from the manager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We init mqttClient
field from the client manager in the start method, just as for v3 adapter 🙂
So this code is almost not changed, at least no additional if
s
} | ||
} | ||
|
||
@Override | ||
protected void doStop() { | ||
try { | ||
this.mqttClient.disconnect().waitForCompletion(getDisconnectCompletionTimeout()); | ||
if (this.mqttClient != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you do now:
else if (clientManager != null) {
this.mqttClient = clientManager.getClient();
}
in the doStart()
, we have to check here for the getClientManager() != null
before performing disconnect()
.
As we agreed: we cannot not close the client we don't owe.
Same applies for the close()
later on in the destroy()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right - changed it at the last moment and missed this part
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if the mqttClient != null
check is redundant in doStart
/doStop
/destroy
methods. It cannot be null from onInit
method - if client cannot be created we throw BeanCreationException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's it.
Reviewed all of them.
Thank you!
this(buildDefaultClientFactory(url), clientId); | ||
} | ||
|
||
public Mqttv3ClientManager(MqttPahoClientFactory clientFactory, String clientId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we still need to rely on this factory for this our new ClientManager
abstraction.
How about just to make this v3 version similar to v5 - based on the MqttConnectionOptions
.
And perhaps we need to add one more MqttClientPersistence
option to these managers impls.
A setter should be OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, initially if I remember correctly you asked to implement client manager using existing client factory abstraction 😄
...ration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java
Show resolved
Hide resolved
...n/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java
Show resolved
Hide resolved
if (this.client instanceof MqttClient) { | ||
((MqttClient) this.client).setTimeToWait(getCompletionTimeout()); | ||
|
||
long completionTimeout = getCompletionTimeout(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No reason to extract this into a variable: used only in one place.
private void subscribe() { | ||
var clientManager = getClientManager(); | ||
if (clientManager != null && this.client == null) { | ||
this.client = clientManager.getClient(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we still need this logic in the subscribe()
. Looks like you are extracting this.client
in the connect()
, so should be OK here in the subscribe()
to not worry about that property value any more.
Do I miss anything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you're right, no sense to have it duplicated
IMqttToken subscribeToken = this.client.subscribe(topics, requestedQos, listeners); | ||
subscribeToken.waitForCompletion(getCompletionTimeout()); | ||
int[] grantedQos = subscribeToken.getGrantedQos(); | ||
if (grantedQos.length == 1 && grantedQos[0] == 0x80) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Magic number? // NOSONAR
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an interesting case 🙂
Initially v3 adapter had a blocking client impl and I have changed it to use async client.
One test case started failing after that, it was MqttAdapterTests#testSubscribeFailure
.
You can see it mocks this magic number 0x80
:
given(token.getGrantedQos()).willReturn(new int[]{ 0x80 });
I have looked through the code of a blocking client and it is just a wrapper around async one - but it really has this strange difference in subscribe
method, /org/eclipse/paho/client/mqttv3/MqttClient.java:465
if (grantedQos.length == 1 && qos[0] == 0x80) {
throw new MqttException(MqttException.REASON_CODE_SUBSCRIBE_FAILED);
}
So I decided to keep the behavior, especially given we had a unit test for it.
I will add //NOSONAR
for now..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure! Makes sense. Let's live with this for now only in v3 impl!
.mapToObj(t -> listener) | ||
.toArray(IMqttMessageListener[]::new); | ||
this.mqttClient.subscribe(subscriptions, null, null, listeners, null) | ||
.waitForCompletion(getCompletionTimeout()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to do the same subscribeToken.getGrantedQos()
logic over here as well, like you have introduced in v3 impl?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For v5 we didn't have it initially. So I am not sure. Probably not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I don't find this 0x80
in the blocking client in v5 as well. Probably it was something related to v3 only.
plain connection properties and client persistence instead * Add missed javadocs * Other code style & cleanup improvements
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's it.
I'll pull this locally today for final review and possible merging.
Thank you very much for a great work!
this.clientId = clientId; | ||
} | ||
|
||
protected void setManualAcks(boolean manualAcks) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that this one has to be public
Merged as 5f12729 after some minor code clean up and refactoring. @oxcafedead , |
Fixes #3685
Introduce some initial design.
Add a new interface
MqttClientManager
which will manage clients andconnections.
Use this manager in v3 topic adapter and message handler.