The Mqttv5PahoMessageHandler.publish()
fails with a Connect already in progress (32110)
under high concurrency #9597
Closed
Description
When input channel for the endpoint with Mqttv5PahoMessageHandler
is an ExecutorChannel
and a lot of concurrent messages are sent, the following exception can be thrown:
2024-10-25 16:41:31.164 [][] ERROR o.s.i.handler.LoggingHandler:250 - org.springframework.messaging.MessageHandlingException: Failed to publish to MQTT in the [bean 'mqttOutboundHandler' for component 'mqttOutboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'; defined in: 'class path resource [com/rms/config/MqttConfig.class]'; from source: 'com.rms.config.MqttConfig.mqttOutboundHandler(org.eclipse.paho.mqttv5.client.MqttConnectionOptions)'], failedMessage=GenericMessage [payload= MQTT MSG 510 from Spring Boot!, headers={replyChannel=nullChannel, errorChannel=, id=c38c5f3b-d4fb-56ac-6abb-f24e3c4d11d6, mqtt_topic=test/, timestamp=1729888891103}]
at org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler.publish(Mqttv5PahoMessageHandler.java:283)
at org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler.handleMessageInternal(Mqttv5PahoMessageHandler.java:222)
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
at org.springframework.integration.dispatcher.UnicastingDispatcher$1.run(UnicastingDispatcher.java:129)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:56)
at java.base/java.util.concurrent.ThreadPerTaskExecutor$TaskRunner.run(ThreadPerTaskExecutor.java:314)
at java.base/java.lang.VirtualThread.run(VirtualThread.java:309)
Caused by: Connect already in progress (32110)
at org.eclipse.paho.mqttv5.client.MqttAsyncClient.connect(MqttAsyncClient.java:734)
at org.eclipse.paho.mqttv5.client.MqttAsyncClient.connect(MqttAsyncClient.java:715)
at org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler.publish(Mqttv5PahoMessageHandler.java:269)
... 9 more
The code:
if (!this.mqttClient.isConnected()) {
this.mqttClient.connect(this.connectionOptions).waitForCompletion(completionTimeout);
}
has to be guarded with a Lock
to prevent concurrent connect()
calls.
The workaround is to add a RequestHandlerRetryAdvice
into this endpoint.
Activity